提交 7ed966e3 编写于 作者: K kailixu

fix: select ins_columns from stb/tb

上级 a5e6bac8
......@@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pReq->info.rsp = pRsp;
pReq->info.rspLen = size;
if (rowsRead == 0 || rowsRead < rowsToRead) {
if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->pIter)) {
pRsp->completed = 1;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true);
......
......@@ -3113,22 +3113,22 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->pIter ? false : true;
while (numOfRows < rows) {
void *prevIter = pShow->pIter;
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
sdbGet(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (!pStb) continue;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->pIter = prevIter;
sdbRelease(pSdb, pStb);
break;
}
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
......@@ -3136,6 +3136,16 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
......
......@@ -312,6 +312,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock);
*/
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status, bool lock);
void sdbGet(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
/**
* @brief Cancel a traversal
......
......@@ -390,6 +390,31 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
return ppRow;
}
void sdbGet(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = NULL;
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return;
size_t keyLen = 0;
void *pKey = taosHashGetKey(pIter, &keyLen);
sdbReadLock(pSdb, type);
SSdbRow **ppRow = (SSdbRow **)taosHashGet(hash, pKey, keyLen);
if (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
sdbUnLock(pSdb, type);
return;
}
atomic_add_fetch_32(&pRow->refCount, 1);
*ppObj = pRow->pObj;
}
sdbUnLock(pSdb, type);
}
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status, bool lock) {
*ppObj = NULL;
......
......@@ -60,6 +60,7 @@ typedef struct SSysTableScanInfo {
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta
SHashObj* pSchema;
SColMatchInfo matchInfo;
SName name;
SSDataBlock* pRes;
......@@ -514,8 +515,11 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
if (pInfo->pSchema == NULL) {
pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
}
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
......@@ -524,33 +528,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table");
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
}
continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if (schema != NULL) {
schemaRow = *(SSchemaWrapper**)schema;
} else {
tDecoderClear(&pInfo->pCur->mr.coder);
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
SMetaReader metaReader = {0};
metaReaderInit(&metaReader, pInfo->readHandle.meta, 0);
int code = metaGetTableEntryByUid(&metaReader, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
metaReaderClear(&metaReader);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
taosHashCleanup(stableSchema);
return NULL;
}
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&metaReader.me.stbEntry.schemaRow);
taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
schemaRow = schemaWrapper;
metaReaderClear(&metaReader);
}
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table");
......@@ -565,7 +572,6 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
metaTbCursorPrev(pInfo->pCur);
if (pInfo->pRes->info.rows > 0) {
......@@ -576,8 +582,6 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
}
taosHashCleanup(stableSchema);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
......@@ -1791,6 +1795,11 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL;
}
if(pInfo->pSchema) {
taosHashCleanup(pInfo->pSchema);
pInfo->pSchema = NULL;
}
taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFreeClear(pInfo->pUser);
......
......@@ -22,7 +22,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
self.setsql = TDSetSql()
self.dbname = 'db'
self.stbname = 'stb'
......@@ -101,18 +101,47 @@ class TDTestCase:
tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1)
def ins_columns_check(self):
tdSql.execute('create database db2 vgroups 2 replica 1')
tdSql.execute('create table db2.stb2 (ts timestamp,c0 int,c1 int, c2 double, c3 float, c4 binary(1000), c5 nchar(100),c7 bigint, c8 bool, c9 smallint) tags(t0 int)')
for i in range(2000):
tdSql.execute("create table db2.ctb%d using db2.stb2 tags(%d)" %(i,i))
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type="CHILD_TABLE"')
tdSql.checkEqual(20000,len(tdSql.queryResult))
print("number of ins_columns of child table in db2 is %s" % len(tdSql.queryResult))
# def ins_columns_check(self):
# tdSql.execute('drop database if exists db2')
# tdSql.execute('create database if not exists db2 vgroups 2 replica 1')
# tdSql.execute('create table db2.stb2 (ts timestamp,c0 int,c1 int, c2 double, c3 float, c4 binary(1000), c5 nchar(100),c7 bigint, c8 bool, c9 smallint) tags(t0 int)')
# for i in range(2000):
# tdSql.execute("create table db2.ctb%d using db2.stb2 tags(%d)" %(i,i))
# tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type="CHILD_TABLE"')
# tdSql.checkEqual(20000,len(tdSql.queryResult))
# print("number of ins_columns of child table in db2 is %s" % len(tdSql.queryResult))
# def ins_columns_check(self):
# tdSql.execute('drop database if exists db2')
# tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
# for i in range (5):
# self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
# for j in range (4094 - i):
# # for j in range (499):
# self.stb4096 += ', c%d int' % (j)
# self.stb4096 += ') tags (t1 int)'
# tdSql.execute(self.stb4096)
# # print ("stb sql is %s" % (self.stb4096))
# for k in range(10):
# tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
# tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
# tdSql.checkEqual(20465,len(tdSql.queryResult))
# tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
# tdSql.checkEqual(20465,len(tdSql.queryResult))
# for j in range (5):
# self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (j)
# for i in range (4095 - j):
# self.ntb4096 += ', c%d binary(10)' % (i)
# self.ntb4096 += ')'
# tdSql.execute(self.ntb4096)
# # print ("ntb sql is %s" % (self.ntb4096))
# tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
# tdSql.checkEqual(20000,len(tdSql.queryResult))
def run(self):
self.prepare_data()
self.count_check()
self.ins_columns_check()
# self.ins_columns_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册