You need to sign in or sign up before continuing.
未验证 提交 109df430 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13012 from taosdata/feature/3_liaohj

fix(query): free the cursor when all data retrieved immediately.
......@@ -401,13 +401,12 @@ typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
SReadHandle readHandle;
SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req;
SEpSet epSet;
tsem_t ready;
SReadHandle readHandle;
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
......
......@@ -304,23 +304,28 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags
const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
const uint8_t* tmp = mr.me.ctbEntry.pTags;
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if (data == NULL) {
metaReaderClear(&mr);
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
memcpy(data + 1, tmp, kvRowLen(tmp));
p = data;
}else{
} else {
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
taosMemoryFree((void*)p);
}
}
......@@ -339,8 +344,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
......@@ -1219,18 +1223,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// retrieve local table list info from vnode
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
if (pInfo->pCur == NULL) {
......@@ -1256,7 +1260,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
while (metaTbCursorNext(pInfo->pCur) == 0) {
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name
......@@ -1339,6 +1345,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
}
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册