diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8b708c3e0f6f06c1e784f3a7e3807371041716b8..dff0ef261fc4b655d13b43ad6f2b3222a0f356b7 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2742,7 +2742,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); // // SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)stbName, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false); // // char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; // tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB); @@ -2750,29 +2750,29 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // varDataSetLen(db, strlen(varDataVal(db))); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)db, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)db, false); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables +// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // if (pStb->commentLen > 0) { // char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; // STR_TO_VARSTR(comment, pStb->comment); -// colDataAppend(pColInfo, numOfRows, comment, false); +// colDataSetVal(pColInfo, numOfRows, comment, false); // } else if (pStb->commentLen == 0) { // char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0}; // STR_TO_VARSTR(comment, ""); -// colDataAppend(pColInfo, numOfRows, comment, false); +// colDataSetVal(pColInfo, numOfRows, comment, false); // } else { // colDataSetNULL(pColInfo, numOfRows); // } @@ -2782,14 +2782,14 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // varDataSetLen(watermark, strlen(varDataVal(watermark))); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)watermark, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false); // // char maxDelay[64 + VARSTR_HEADER_SIZE] = {0}; // sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]); // varDataSetLen(maxDelay, strlen(varDataVal(maxDelay))); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false); // // char rollup[160 + VARSTR_HEADER_SIZE] = {0}; // int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); @@ -2808,7 +2808,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // varDataSetLen(rollup, strlen(varDataVal(rollup))); // // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataAppend(pColInfo, numOfRows, (const char *)rollup, false); +// colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false); // // numOfRows++; // sdbRelease(pSdb, pStb); @@ -3067,20 +3067,20 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p for (int32_t j = 0; j < pm->colNum; j++) { // table name SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0); - colDataAppend(pColInfoData, numOfRows, tName, false); + colDataSetVal(pColInfoData, numOfRows, tName, false); // database name pColInfoData = taosArrayGet(p->pDataBlock, 1); - colDataAppend(pColInfoData, numOfRows, dName, false); + colDataSetVal(pColInfoData, numOfRows, dName, false); pColInfoData = taosArrayGet(p->pDataBlock, 2); - colDataAppend(pColInfoData, numOfRows, typeName, false); + colDataSetVal(pColInfoData, numOfRows, typeName, false); // col name char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(colName, pm->schema[j].name); pColInfoData = taosArrayGet(p->pDataBlock, 3); - colDataAppend(pColInfoData, numOfRows, colName, false); + colDataSetVal(pColInfoData, numOfRows, colName, false); // col type int8_t colType = pm->schema[j].type; @@ -3095,10 +3095,10 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(colTypeStr, colTypeLen); - colDataAppend(pColInfoData, numOfRows, (char *)colTypeStr, false); + colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false); pColInfoData = taosArrayGet(p->pDataBlock, 5); - colDataAppend(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false); + colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false); for (int32_t k = 6; k <= 8; ++k) { pColInfoData = taosArrayGet(p->pDataBlock, k); colDataSetNULL(pColInfoData, numOfRows); @@ -3192,19 +3192,19 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB for (int i = 0; i < pStb->numOfColumns; i++) { int32_t cols = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)stbName, false); + colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)db, false); + colDataSetVal(pColInfo, numOfRows, (const char *)db, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, typeName, false); + colDataSetVal(pColInfo, numOfRows, typeName, false); // col name char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(colName, pStb->pColumns[i].name); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, colName, false); + colDataSetVal(pColInfo, numOfRows, colName, false); // col type int8_t colType = pStb->pColumns[i].type; @@ -3219,10 +3219,10 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(colTypeStr, colTypeLen); - colDataAppend(pColInfo, numOfRows, (char *)colTypeStr, false); + colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false); while (cols < pShow->numOfColumns) { pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1fbdb255281d731478da87fbc00c26e0bb17a618..9b6d97d6e2030fdf089cef14c6c590629d647bad 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -503,6 +503,74 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } +static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) { + if (blockDataGetNumOfCols(pBlock) > 0) { + return TSDB_CODE_SUCCESS; + } + + int32_t numOfCols = taosArrayGetSize(pColIdList); + + if (numOfCols == 0) { // all columns are required + for (int32_t i = 0; i < pSchema->nCols; ++i) { + SSchema* pColSchema = &pSchema->pSchema[i]; + SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + blockDataFreeRes(pBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + } else { + if (numOfCols > pSchema->nCols) { + numOfCols = pSchema->nCols; + } + + int32_t i = 0; + int32_t j = 0; + while (i < pSchema->nCols && j < numOfCols) { + SSchema* pColSchema = &pSchema->pSchema[i]; + col_id_t colIdSchema = pColSchema->colId; + + col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j); + if (colIdSchema < colIdNeed) { + i++; + } else if (colIdSchema > colIdNeed) { + j++; + } else { + SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + i++; + j++; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) { + int32_t code = TSDB_CODE_SUCCESS; + + if (IS_STR_DATA_TYPE(pColVal->type)) { + char val[65535 + 2] = {0}; + if (pColVal->value.pData != NULL) { + memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData); + varDataSetLen(val, pColVal->value.nData); + code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal)); + } else { + colDataSetNULL(pColumnInfoData, rowIndex); + } + } else { + code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal)); + } + + return code; +} + int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); @@ -538,53 +606,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet pReader->cachedSchemaSuid = suid; pReader->cachedSchemaVer = sversion; - SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - if (blockDataGetNumOfCols(pBlock) > 0) { - blockDataDestroy(pReader->pResBlock); - pReader->pResBlock = createDataBlock(); - pBlock = pReader->pResBlock; - - pBlock->info.id.uid = uid; - pBlock->info.version = pReader->msg.ver; - } - - int32_t numOfCols = taosArrayGetSize(pReader->pColIdList); - if (numOfCols == 0) { // all columns are required - for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[i]; - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - blockDataFreeRes(pBlock); - return -1; - } - } - } else { - if (numOfCols > pSchemaWrapper->nCols) { - numOfCols = pSchemaWrapper->nCols; - } - - int32_t i = 0; - int32_t j = 0; - while (i < pSchemaWrapper->nCols && j < numOfCols) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[i]; - col_id_t colIdSchema = pColSchema->colId; - - col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j); - if (colIdSchema < colIdNeed) { - i++; - } else if (colIdSchema > colIdNeed) { - j++; - } else { - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - return -1; - } - i++; - j++; - } + ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version); + if (blockDataGetNumOfCols(pBlock) == 0) { + int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList); + if (code != TSDB_CODE_SUCCESS) { + return code; } } } @@ -632,30 +658,15 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { tColDataGetValue(pCol, i, &colVal); - if (IS_STR_DATA_TYPE(colVal.type)) { - if (colVal.value.pData != NULL) { - char val[65535 + 2] = {0}; - memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); - varDataSetLen(val, colVal.value.nData); - if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - return -1; - } - } else { - colDataSetNULL(pColData, i); - } - } else { - if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - return -1; - } + int32_t code = doSetVal(pColData, i, &colVal); + if (code != TSDB_CODE_SUCCESS) { + return code; } } sourceIdx++; targetIdx++; } else { - for (int32_t i = 0; i < pCol->nVal; i++) { - colDataSetNULL(pColData, i); - } - + colDataSetNNULL(pColData, 0, pCol->nVal); targetIdx++; } } @@ -681,21 +692,9 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet sourceIdx++; continue; } else if (colVal.cid == pColData->info.colId) { - if (IS_STR_DATA_TYPE(colVal.type)) { - if (colVal.value.pData != NULL) { - char val[65535 + 2] = {0}; - memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); - varDataSetLen(val, colVal.value.nData); - if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - return -1; - } - } else { - colDataSetNULL(pColData, i); - } - } else { - if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - return -1; - } + int32_t code = doSetVal(pColData, i, &colVal); + if (code != TSDB_CODE_SUCCESS) { + return code; } sourceIdx++; @@ -833,14 +832,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas char val[65535 + 2]; memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); - if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } } else { colDataSetNULL(pColData, curRow - lastRow); } } else { - if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } } @@ -930,14 +929,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas char val[65535 + 2]; memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); - if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } } else { colDataSetNULL(pColData, curRow - lastRow); } } else { - if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { goto FAIL; } }