diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index e852489616f389a9a922ec251460e41dd98bccba..e0aacbfec9db8804b4003417689f404b78549afb 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -146,6 +146,9 @@ extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind); void tColDataSortMerge(SArray *colDataArr); +//for raw block +int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, + int32_t nRows, char* lengthOrbitmap, char *data); // for encode/decode int32_t tPutColData(uint8_t *pBuf, SColData *pColData); int32_t tGetColData(uint8_t *pBuf, SColData *pColData); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 34603fdb64f55dfc80a5b40abcd7719c5daac203..40bbe275eaa2a9fb9d77f5b89fd44e6f0a5eb576 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -115,6 +115,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); +int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq* pCreateTb); + int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 150194aa278deebc757d4108eb0f92469cae71f0..8c48a745f542dce0140f0d99690b14cecba5460e 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1201,16 +1201,6 @@ end: return code; } -typedef struct { - SVgroupInfo vg; - void* data; -} VgData; - -static void destroyVgHash(void* data) { - VgData* vgData = (VgData*)data; - taosMemoryFreeClear(vgData->data); -} - int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){ int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; @@ -1637,8 +1627,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } - pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pVgHash, destroyVgHash); struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { @@ -1652,6 +1640,13 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + pQuery = smlInitHandle(); + if(pQuery == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + printf("raw data block num:%d\n", rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); @@ -1659,14 +1654,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); - setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); - - code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); - if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: setQueryResultFromRsp error"); - goto end; - } const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if (!tbName) { @@ -1680,13 +1667,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); - VgData vgData = {0}; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); - if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); - goto end; - } - code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); @@ -1698,164 +1678,29 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } - uint16_t fLen = 0; - int32_t rowSize = 0; - int16_t nVar = 0; - for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { - SSchema* schema = &pTableMeta->schema[i]; - fLen += TYPE_BYTES[schema->type]; - rowSize += schema->bytes; - if (IS_VAR_DATA_TYPE(schema->type)) { - nVar++; - } - } - fLen -= sizeof(TSKEY); - - int32_t rows = rspObj.resInfo.numOfRows; - int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + - (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); - int32_t schemaLen = 0; - int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; - - SSubmitReq* subReq = NULL; - SSubmitBlk* blk = NULL; - void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); - if (hData) { - vgData = *(VgData*)hData; - - int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; - void* tmp = taosMemoryRealloc(vgData.data, totalLen); - if (tmp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - vgData.data = tmp; - ((VgData*)hData)->data = tmp; - subReq = (SSubmitReq*)(vgData.data); - blk = POINTER_SHIFT(vgData.data, subReq->length); - } else { - int32_t totalLen = sizeof(SSubmitReq) + submitLen; - void* tmp = taosMemoryCalloc(1, totalLen); - if (tmp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - vgData.data = tmp; - taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData)); - subReq = (SSubmitReq*)(vgData.data); - subReq->length = sizeof(SSubmitReq); - subReq->numOfBlocks = 0; - - blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); - } - - // pSW->pSchema should be same as pTableMeta->schema - // ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns); - uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); - uint64_t uid = pTableMeta->uid; - int16_t sver = pTableMeta->sversion; - - void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); - STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); - - SRowBuilder rb = {0}; - tdSRowInit(&rb, sver); - tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); - int32_t totalLen = 0; - - SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (int i = 0; i < pSW->nCols; i++) { - SSchema* schema = &pSW->pSchema[i]; - taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + SVgroupInfo vg; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; } - for (int32_t j = 0; j < rows; j++) { - tdSRowResetBuf(&rb, rowData); - - doSetOneRowPtr(&rspObj.resInfo); - rspObj.resInfo.current += 1; - - int32_t offset = 0; - for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { - const SSchema* pColumn = &pTableMeta->schema[k]; - int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); - if (!index) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); - } else { - char* colData = rspObj.resInfo.row[*index]; - if (!colData) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); - } else { - if (IS_VAR_DATA_TYPE(pColumn->type)) { - colData -= VARSTR_HEADER_SIZE; - } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); - } - } - if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { - offset += TYPE_BYTES[pColumn->type]; - } - } - tdSRowEnd(&rb); - int32_t rowLen = TD_ROW_LEN(rowData); - rowData = POINTER_SHIFT(rowData, rowLen); - totalLen += rowLen; + void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); + if (hData == NULL) { + taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); } - taosHashCleanup(schemaHash); - blk->uid = htobe64(uid); - blk->suid = htobe64(suid); - blk->sversion = htonl(sver); - blk->schemaLen = htonl(schemaLen); - blk->numOfRows = htonl(rows); - blk->dataLen = htonl(totalLen); - subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; - subReq->numOfBlocks++; - taosMemoryFreeClear(pTableMeta); - rspObj.resInfo.pRspMsg = NULL; - doFreeReqResultInfo(&rspObj.resInfo); - } - - pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); - if (NULL == pQuery) { - uError("create SQuery error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - pQuery->haveResultSet = false; - pQuery->msgType = TDMT_VND_SUBMIT; - pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); - if (NULL == pQuery->pRoot) { - uError("create pQuery->pRoot error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); - - int32_t numOfVg = taosHashGetSize(pVgHash); - nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); - - VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL); - while (vData) { - SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); - if (NULL == dst) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, NULL); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:rawBlockBindData failed"); goto end; } - dst->vg = vData->vg; - SSubmitReq* subReq = (SSubmitReq*)(vData->data); - dst->numOfTables = subReq->numOfBlocks; - dst->size = subReq->length; - dst->pData = (char*)subReq; - vData->data = NULL; // no need free - subReq->header.vgId = htonl(dst->vg.vgId); - subReq->version = htonl(1); - subReq->header.contLen = htonl(subReq->length); - subReq->length = htonl(subReq->length); - subReq->numOfBlocks = htonl(subReq->numOfBlocks); - taosArrayPush(nodeStmt->pDataBlocks, &dst); - vData = (VgData*)taosHashIterate(pVgHash, vData); + } + + code = smlBuildOutput(pQuery, pVgHash); + if (code != TSDB_CODE_SUCCESS) { + uError("smlBuildOutput failed"); + return code; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1863,8 +1708,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { end: tDeleteSMqDataRsp(&rspObj.rsp); - rspObj.resInfo.pRspMsg = NULL; - doFreeReqResultInfo(&rspObj.resInfo); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1907,8 +1750,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pVgHash, destroyVgHash); struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { @@ -1922,6 +1763,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + pQuery = smlInitHandle(); + if(pQuery == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + printf("raw data block num:%d\n", rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); @@ -1929,14 +1777,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); - setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); - - code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); - if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: setQueryResultFromRsp error"); - goto end; - } const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if (!tbName) { @@ -1950,44 +1790,28 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); - VgData vgData = {0}; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); - if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); - goto end; - } - // find schema data info - int32_t schemaLen = 0; - void* schemaData = NULL; + SVCreateTbReq pCreateReq = {0}; + for (int j = 0; j < rspObj.rsp.createTableNum; j++) { void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); SDecoder decoderTmp = {0}; - SVCreateTbReq pCreateReq = {0}; - tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); + memset(&pCreateReq, 0, sizeof(SVCreateTbReq)); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); - taosMemoryFreeClear(pCreateReq.comment); - taosArrayDestroy(pCreateReq.ctb.tagName); goto end; } ASSERT(pCreateReq.type == TSDB_CHILD_TABLE); if (strcmp(tbName, pCreateReq.name) == 0) { - schemaLen = *lenTmp; - schemaData = *dataTmp; strcpy(pName.tname, pCreateReq.ctb.stbName); tDecoderClear(&decoderTmp); - taosMemoryFreeClear(pCreateReq.comment); - taosArrayDestroy(pCreateReq.ctb.tagName); break; } tDecoderClear(&decoderTmp); - taosMemoryFreeClear(pCreateReq.comment); - taosArrayDestroy(pCreateReq.ctb.tagName); } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); @@ -2001,167 +1825,23 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - uint16_t fLen = 0; - int32_t rowSize = 0; - int16_t nVar = 0; - for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { - SSchema* schema = &pTableMeta->schema[i]; - fLen += TYPE_BYTES[schema->type]; - rowSize += schema->bytes; - if (IS_VAR_DATA_TYPE(schema->type)) { - nVar++; - } - } - fLen -= sizeof(TSKEY); - - int32_t rows = rspObj.resInfo.numOfRows; - int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + - (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); - - int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; - - SSubmitReq* subReq = NULL; - SSubmitBlk* blk = NULL; - void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); - if (hData) { - vgData = *(VgData*)hData; - - int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; - void* tmp = taosMemoryRealloc(vgData.data, totalLen); - if (tmp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - vgData.data = tmp; - ((VgData*)hData)->data = tmp; - subReq = (SSubmitReq*)(vgData.data); - blk = POINTER_SHIFT(vgData.data, subReq->length); - } else { - int32_t totalLen = sizeof(SSubmitReq) + submitLen; - void* tmp = taosMemoryCalloc(1, totalLen); - if (tmp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - vgData.data = tmp; - taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData)); - subReq = (SSubmitReq*)(vgData.data); - subReq->length = sizeof(SSubmitReq); - subReq->numOfBlocks = 0; - - blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); - } - - // pSW->pSchema should be same as pTableMeta->schema - // ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns); - uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); - uint64_t uid = pTableMeta->uid; - int16_t sver = pTableMeta->sversion; - - void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); - if (schemaData) { - memcpy(blkSchema, schemaData, schemaLen); - } - STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); - - SRowBuilder rb = {0}; - tdSRowInit(&rb, sver); - tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); - int32_t totalLen = 0; - - SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (int i = 0; i < pSW->nCols; i++) { - SSchema* schema = &pSW->pSchema[i]; - taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + SVgroupInfo vg; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; } - for (int32_t j = 0; j < rows; j++) { - tdSRowResetBuf(&rb, rowData); - - doSetOneRowPtr(&rspObj.resInfo); - rspObj.resInfo.current += 1; - - int32_t offset = 0; - for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { - const SSchema* pColumn = &pTableMeta->schema[k]; - int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); - if (!index) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); - } else { - char* colData = rspObj.resInfo.row[*index]; - if (!colData) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); - } else { - if (IS_VAR_DATA_TYPE(pColumn->type)) { - colData -= VARSTR_HEADER_SIZE; - } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); - } - } - if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { - offset += TYPE_BYTES[pColumn->type]; - } - } - tdSRowEnd(&rb); - int32_t rowLen = TD_ROW_LEN(rowData); - rowData = POINTER_SHIFT(rowData, rowLen); - totalLen += rowLen; + void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); + if (hData == NULL) { + taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); } - taosHashCleanup(schemaHash); - blk->uid = htobe64(uid); - blk->suid = htobe64(suid); - blk->sversion = htonl(sver); - blk->schemaLen = htonl(schemaLen); - blk->numOfRows = htonl(rows); - blk->dataLen = htonl(totalLen); - subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; - subReq->numOfBlocks++; - taosMemoryFreeClear(pTableMeta); - rspObj.resInfo.pRspMsg = NULL; - doFreeReqResultInfo(&rspObj.resInfo); - } - - pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); - if (NULL == pQuery) { - uError("create SQuery error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - pQuery->haveResultSet = false; - pQuery->msgType = TDMT_VND_SUBMIT; - pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); - if (NULL == pQuery->pRoot) { - uError("create pQuery->pRoot error"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); - - int32_t numOfVg = taosHashGetSize(pVgHash); - nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); - - VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL); - while (vData) { - SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); - if (NULL == dst) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, &pCreateReq); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:rawBlockBindData failed"); goto end; } - dst->vg = vData->vg; - SSubmitReq* subReq = (SSubmitReq*)(vData->data); - dst->numOfTables = subReq->numOfBlocks; - dst->size = subReq->length; - dst->pData = (char*)subReq; - vData->data = NULL; // no need free - subReq->header.vgId = htonl(dst->vg.vgId); - subReq->version = htonl(1); - subReq->header.contLen = htonl(subReq->length); - subReq->length = htonl(subReq->length); - subReq->numOfBlocks = htonl(subReq->numOfBlocks); - taosArrayPush(nodeStmt->pDataBlocks, &dst); - vData = (VgData*)taosHashIterate(pVgHash, vData); } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -2169,8 +1849,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) end: tDeleteSTaosxRsp(&rspObj.rsp); - rspObj.resInfo.pRspMsg = NULL; - doFreeReqResultInfo(&rspObj.resInfo); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f6638fab60dd3ae3a680cd7d74774620d10dae92..11bd340f8e51e30c44eb5e80872f245c6013ce57 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2091,6 +2091,61 @@ _exit: return code; } +int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, + int32_t nRows, char* lengthOrbitmap, char *data) { + int32_t code = 0; + + if (IS_VAR_DATA_TYPE(type)) { // var-length data type + for (int32_t i = 0; i < nRows; ++i) { + int32_t offset = *((int32_t*)lengthOrbitmap + i); + if (offset == -1) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); + if (code) goto _exit; + } else { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE]( + pColData, (uint8_t *)varDataVal(data + offset), varDataLen(data + offset)); + } + } + } else { // fixed-length data type + bool allValue = true; + bool allNull = true; + for (int32_t i = 0; i < nRows; ++i) { + if(!colDataIsNull_f(lengthOrbitmap, i)){ + allNull = false; + }else{ + allValue = false; + } + } + + if (allValue) { + // optimize (todo) + for (int32_t i = 0; i < nRows; ++i) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE]( + pColData, (uint8_t *)data + bytes * i, bytes); + } + } else if (allNull) { + // optimize (todo) + for (int32_t i = 0; i < nRows; ++i) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); + if (code) goto _exit; + } + } else { + for (int32_t i = 0; i < nRows; ++i) { + if (colDataIsNull_f(lengthOrbitmap, i)) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); + if (code) goto _exit; + } else { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE]( + pColData, (uint8_t *)data + bytes * i, bytes); + } + } + } + } + + _exit: + return code; +} + int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) { int32_t code = 0; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 9507a546d75eb900eb4582560b7b634b481fe089..39c027f306fb3e1694ca483cebcbac0c39772115 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -20,6 +20,7 @@ #include "parUtil.h" #include "querynodes.h" #include "tRealloc.h" +#include "tdatablock.h" typedef struct SBlockKeyTuple { TSKEY skey; @@ -295,7 +296,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in } if (*dataBlocks == NULL) { - int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks); + int32_t ret = createTableDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -1371,3 +1372,77 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, return code; } + +int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq *pCreateTb){ + STableDataCxt* pTableCxt = NULL; + int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), + pTableMeta, &pCreateTb, &pTableCxt, true); + if (ret != TSDB_CODE_SUCCESS) { + uError("insGetTableDataCxt error"); + goto end; + } + + // no need to bind, because select * get all fields + ret = initTableColSubmitData(pTableCxt); + if (ret != TSDB_CODE_SUCCESS) { + uError( "initTableColSubmitData error"); + goto end; + } + + char* p = (char*)pRsp->data; + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + p += sizeof(int32_t); + p += sizeof(int32_t); + + int32_t numOfRows = *(int32_t*)p; + p += sizeof(int32_t); + + int32_t numOfCols = *(int32_t*)p; + p += sizeof(int32_t); + + p += sizeof(int32_t); + p += sizeof(uint64_t); + + int8_t *fields = p; + p += numOfCols * (sizeof(int8_t) + sizeof(int32_t)); + + int32_t* colLength = (int32_t*)p; + p += sizeof(int32_t) * numOfCols; + + char* pStart = p; + + SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); + SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; + + if(boundInfo->numOfBound != numOfCols){ + uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } + for (int c = 0; c < boundInfo->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); + + if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + uError( "type or bytes not equal"); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } + + colLength[c] = htonl(colLength[c]); + int8_t* offset = pStart; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + pStart += numOfRows * sizeof(int32_t); + } else { + pStart += BitmapLen(numOfRows); + } + char *pData = pStart; + + tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); + fields += sizeof(int8_t) + sizeof(int32_t); + pStart += colLength[c]; + } + +end: + return ret; +} \ No newline at end of file