diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d84cbfe409170dc6d355ec27f9ad7cd471df76d6..c3dcdbd221c23b9feba1e97a949808e54cb346b8 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1505,7 +1505,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1665,7 +1676,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dcb63f65246cfc103790c2e46b2ca4c8bd4beafe..4eb0151c58945fbe869d9480a7708efa67fa4cac 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dGError("msg:%p, not processed since no handler", pMsg); + dGError("msg:%p,info:%s not processed since no handler", pMsg, TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 12f7af3a717172a84cc3e8f3cd1948f39b842b07..bbc21d498bfb60fbc89f82308b2f22bc9db94d2b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1006,13 +1006,257 @@ FAIL: #endif int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) { - taosArrayPush(blocks, &block); - SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper); - taosArrayPush(schemas, &pSW); - return 0; + tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); + + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); + pReader->nextBlk++; + + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + int32_t sversion = pSubmitTbData->sver; + int64_t suid = pSubmitTbData->suid; + int64_t uid = pSubmitTbData->uid; + pReader->lastBlkUid = uid; + + taosMemoryFree(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchema == NULL) { + tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 + "), version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + + tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchemaWrapper == NULL) { + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + + STSchema* pTschema = pReader->pSchema; + SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + int32_t numOfRows = 0; + + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + SColData* pCol = taosArrayGet(pCols, 0); + numOfRows = pCol->nVal; + } else { + SArray* pRows = pSubmitTbData->aRowP; + numOfRows = taosArrayGetSize(pRows); + } + + int32_t curRow = 0; + int32_t lastRow = 0; + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + if (assigned == NULL) return -1; + + // convert and scan one block + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + int32_t numOfCols = taosArrayGetSize(pCols); + for (int32_t i = 0; i < numOfRows; i++) { + bool buildNew = false; + + for (int32_t j = 0; j < numOfCols; j++){ + SColData* pCol = taosArrayGet(pCols, j); + SColVal colVal; + tColDataGetValue(pCol, i, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColData* pCol = taosArrayGet(pCols, sourceIdx); + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + + if (pCol->cid < pColData->info.colId) { + sourceIdx++; + } else if (pCol->cid == pColData->info.colId) { + tColDataGetValue(pCol, i, &colVal); + + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + + curRow++; + } + } else { + SArray* pRows = pSubmitTbData->aRowP; + for (int32_t i = 0; i < numOfRows; i++) { + SRow* pRow = taosArrayGetP(pRows, i); + bool buildNew = false; + + for (int32_t j = 0; j < pTschema->numOfCols; j++){ + SColVal colVal; + tRowGet(pRow, pTschema, j, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + tRowGet(pRow, pTschema, sourceIdx, &colVal); + + if (colVal.cid < pColData->info.colId) { + sourceIdx++; + } else if (colVal.cid == pColData->info.colId) { + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + curRow++; + } } + + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + + taosMemoryFree(assigned); + return 0; + + FAIL: + taosMemoryFree(assigned); return -1; } diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 7ca18f293cddf415a2b4ac3354f5848171e7b736..c056451978d0ac2527d326ca02553c9ff3b68c52 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -45,7 +45,7 @@ class TDTestCase: if drop: tdSql.checkRows(10) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -63,20 +63,20 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") tdSql.execute('use abc1') tdSql.query("show tables") if drop: tdSql.checkRows(10) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -94,13 +94,13 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") return