提交 3aab21c0 编写于 作者: H Haojun Liao

[td-13039] fix bug in show tables/stables.

上级 46242e8a
......@@ -87,8 +87,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
......@@ -97,14 +97,13 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
......
......@@ -239,6 +239,56 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
return numOfRow1 + numOfRow2;
}
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows) {
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
if (numOfRows == 0) {
return numOfRows;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
// Handle the bitmap
char* p = realloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * numOfRows);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->varmeta.offset = (int32_t*) p;
memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
char* tmp = realloc(pColumnInfoData->pData, pSource->varmeta.length);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->pData = tmp;
pColumnInfoData->varmeta.allocLen = pSource->varmeta.length;
}
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
pColumnInfoData->varmeta.length = pSource->varmeta.length;
} else {
char* tmp = realloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->nullbitmap = tmp;
memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
int32_t newSize = numOfRows * pColumnInfoData->info.bytes;
tmp = realloc(pColumnInfoData->pData, newSize);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->pData = tmp;
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
}
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
ASSERT(pBlock);
......
......@@ -1447,8 +1447,8 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.update;
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
static void setInformationSchemaDbCfg(SDbObj* pDbObj) {
......
......@@ -440,8 +440,10 @@ typedef struct SSysTableScanInfo {
tsem_t ready;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store.
int32_t type; // show type, TODO remove it
void *pCur; // cursor for iterate the local table meta store.
SArray *scanCols; // SArray<int16_t> scan column id list
int32_t type; // show type, TODO remove it
SName name;
SSDataBlock* pRes;
int32_t capacity;
......@@ -628,7 +630,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset,
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
......
......@@ -1221,9 +1221,9 @@ static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, in
for (int32_t k = 0; k < numOfOutput; ++k) {
if (pCtx[k].fpSet.init == NULL) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
memcpy(pColInfoData->pData, pCtx[k].input.pData[0]->pData, colDataGetLength(pColInfoData, pCtx[k].input.numOfRows));
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
} else { // TODO: arithmetic and other process.
ASSERT(0);
}
}
......@@ -4937,16 +4937,36 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
return TSDB_CODE_SUCCESS;
}
// TODO if only one or two columnss required, how to extract data?
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen,
int32_t numOfOutput, int64_t startTs, uint64_t* total) {
int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) {
blockDataEnsureCapacity(pRes, numOfRows);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
if (pColList == NULL) {
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
for (int32_t j = 0; j < numOfRows; ++j) {
colDataAppend(pColInfoData, j, pData, false);
pData += pColInfoData->info.bytes;
}
}
} else { // extract data acording to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList));
for(int32_t i = 0; i < numOfOutput; ++i) {
for(int32_t j = 0; j < numOfRows; ++j) {
colDataAppend(pColInfoData, j, pData, false);
pData += pColInfoData->info.bytes;
for(int32_t j = 0; j < numOfOutput; ++j) {
int16_t colIndex = *(int16_t*) taosArrayGet(pColList, j);
if (colIndex - 1 == i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, j);
for (int32_t k = 0; k < numOfRows; ++k) {
colDataAppend(pColInfoData, k, pData, false);
pData += pColInfoData->info.bytes;
}
break;
}
}
}
}
......@@ -5016,7 +5036,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
goto _error;
}
......@@ -5118,7 +5138,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
SSDataBlock* pRes = pExchangeInfo->pResult;
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
......@@ -5460,12 +5480,17 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
}
blockDataClearup(pInfo->pRes, true);
SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 0);
char * name = NULL;
int32_t numOfRows = 0;
char n[TSDB_TABLE_NAME_LEN] = {0};
while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) {
colDataAppend(pTableNameCol, numOfRows, name, false);
STR_TO_VARSTR(n, name);
colDataAppend(pTableNameCol, numOfRows, n, false);
numOfRows += 1;
if (numOfRows >= pInfo->capacity) {
break;
......@@ -5518,7 +5543,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
return doFilterResult(pInfo);
}
......@@ -5527,7 +5552,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
}
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo) {
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5540,6 +5565,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
......@@ -5582,9 +5608,31 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
} else {
tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = epset;
}
pInfo->readHandle = pSysTableReadHandle;
#if 1
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "DB-META";
rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root";
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
pInfo->pTransporter = rpcOpen(&rpcInit);
if (pInfo->pTransporter == NULL) {
return NULL; // todo
}
}
#endif
}
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blockingOptr = false;
......@@ -5595,29 +5643,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
#if 1
{ // todo refactor
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "DB-META";
rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root";
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
pInfo->pTransporter = rpcOpen(&rpcInit);
if (pInfo->pTransporter == NULL) {
return NULL; // todo
}
}
#endif
return pOperator;
}
......@@ -6375,15 +6400,11 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
pProjectInfo->existDataBlock = pBlock;
break;
} else { // init output buffer for a new group data
// for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
// aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
// }
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
}
}
// todo dynamic set tags
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
......@@ -6400,7 +6421,6 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
}
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
// resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
......@@ -8165,7 +8185,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->scan.node.pConditions, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(pHandle->meta, pResBlock, &pScanNode->tableName,
pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo);
return pOperator;
} else {
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册