提交 f6e82764 编写于 作者: H Haojun Liao

fix(query): free the cursor when all data retrieved immediately.

上级 e5bb5675
...@@ -401,13 +401,12 @@ typedef struct SStreamBlockScanInfo { ...@@ -401,13 +401,12 @@ typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
SReadHandle readHandle;
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req; SRetrieveTableReq req;
SEpSet epSet; SEpSet epSet;
tsem_t ready; tsem_t ready;
SReadHandle readHandle;
int32_t accountId; int32_t accountId;
bool showRewrite; bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database SNode* pCondition; // db_name filter condition, to discard data that are not in current database
......
...@@ -304,23 +304,28 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -304,23 +304,28 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = NULL; const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){ if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
const uint8_t *tmp = mr.me.ctbEntry.pTags; const uint8_t* tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){ char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if (data == NULL) {
metaReaderClear(&mr);
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return; return;
} }
*data = TSDB_DATA_TYPE_JSON; *data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp)); memcpy(data + 1, tmp, kvRowLen(tmp));
p = data; p = data;
}else{ } else {
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL)); colDataAppend(pColInfoData, i, p, (p == NULL));
} }
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
taosMemoryFree((void*)p); taosMemoryFree((void*)p);
} }
} }
...@@ -339,8 +344,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p ...@@ -339,8 +344,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
colInfoDataEnsureCapacity(&infoData, 0, 1); colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid); colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
SScalarParam srcParam = { SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData}; SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param); fpSet.process(&srcParam, 1, &param);
...@@ -1219,18 +1223,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1219,18 +1223,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// retrieve local table list info from vnode // retrieve local table list info from vnode
const char* name = tNameGetTableName(&pInfo->name); const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database. // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) { if (pInfo->readHandle.mnd != NULL) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo); doFilterResult(pInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else { } else {
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
...@@ -1256,7 +1260,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1256,7 +1260,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
while (metaTbCursorNext(pInfo->pCur) == 0) {
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name); STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name // table name
...@@ -1339,6 +1345,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1339,6 +1345,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
} }
} }
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows; p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册