diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 7e9a24ff1f560d0d56b0463c8377046c5d27da86..ed45bf3d72cafae876452dbc5b9e293af7f22cc2 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -302,8 +302,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); +// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/client/taos.h b/include/client/taos.h index f8af010aa65cb5354b148eb802cc91340981b9f8..5f147bb07cadbb7969a352fd5b0fb0191e732bb2 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -270,6 +270,7 @@ typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); +DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 5b96729503fb128f37f15b61045678b8e3aa9f01..893aa39ce3a74db33f6520bab7455a2331c74a0e 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -244,6 +244,7 @@ void *createRequest(uint64_t connId, int32_t type) { STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { + taosMemoryFree(pRequest); terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7093e14982f87bb214f259c66f1e07be0530b609..dbe1d9d4a47846fb5d5ae3eb19a3a8d3a175bcd5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -834,7 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { tstrerror(code), pRequest->requestId); STscObj* pTscObj = pRequest->pTscObj; - if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { + if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ca45202547a3f6abc940f13f520932983ca6862e..bb42e47642586dd2a45c21f639ec5dd7d3ff01d3 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1489,7 +1489,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr } info->id = smlGenId(); - info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery)); + info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == info->pQuery) { uError("SML:0x%" PRIx64 " create info->pQuery error", info->id); goto cleanup; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4c29adcdc0eda911a358cf28a609a789914dc5cc..88b305675f6b401ff6e1cacb32febd16c3c64ff5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -2952,9 +2952,241 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){ return TSDB_CODE_INVALID_PARA; } -void tmq_free_raw_meta(tmq_raw_data* rawMeta) { - // - taosMemoryFreeClear(rawMeta); +typedef struct{ + SVgroupInfo vg; + void *data; +}VgData; + +static void destroyVgHash(void* data) { + VgData* vgData = (VgData*)data; + taosMemoryFreeClear(vgData->data); +} + +int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ + if (!TD_RES_TMQ(msg)) { + uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg); + return TSDB_CODE_TMQ_INVALID_MSG; + } + + int32_t code = TSDB_CODE_SUCCESS; + SHashObj *pVgHash = NULL; + SQuery *pQuery = NULL; + + terrno = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if(!pRequest){ + uError("WriteRaw:createRequest error request is null"); + return terrno; + } + + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pVgHash, destroyVgHash); + struct SCatalog *pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + SMqRspObj *rspObj = ((SMqRspObj*)msg); + while (++rspObj->resIter < rspObj->rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter); + if (!rspObj->rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj->resIter); + goto end; + } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter); + setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols); + + code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: setQueryResultFromRsp error"); + goto end; + } + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < pSW->nCols; i++) { + SSchema *schema = pSW->pSchema + i; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if(IS_VAR_DATA_TYPE(schema->type)){ + nVar ++; + } + } + + int32_t rows = rspObj->resInfo.numOfRows; + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); + int32_t schemaLen = 0; + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + const char* tbName = tmq_get_table_name(msg); + if(!tbName){ + uError("WriteRaw: tbname is null"); + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbName); + + VgData vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; + } + + SSubmitReq* subReq = NULL; + SSubmitBlk* blk = NULL; + void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); + if(hData){ + vgData = *(VgData*)hData; + + int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; + void *tmp = taosMemoryRealloc(vgData.data, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + ((VgData*)hData)->data = tmp; + subReq = (SSubmitReq*)(vgData.data); + blk = POINTER_SHIFT(vgData.data, subReq->length); + }else{ + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + void *tmp = taosMemoryCalloc(1, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData)); + subReq = (SSubmitReq*)(vgData.data); + subReq->length = sizeof(SSubmitReq); + subReq->numOfBlocks = 0; + + blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); + } + + STableMeta** pTableMeta = NULL; + code = catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); + goto end; + } + uint64_t suid = (TSDB_NORMAL_TABLE == (*pTableMeta)->tableType ? 0 : (*pTableMeta)->suid); + uint64_t uid = (*pTableMeta)->uid; + taosMemoryFreeClear(*pTableMeta); + + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pSW->version); + tdSRowSetTpInfo(&rb, pSW->nCols, fLen); + int32_t dataLen = 0; + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + + doSetOneRowPtr(&rspObj->resInfo); + rspObj->resInfo.current += 1; + + int32_t offset = 0; + for (int32_t k = 0; k < pSW->nCols; k++) { + const SSchema* pColumn = &pSW->pSchema[k]; + void *data = rspObj->resInfo.row[k]; + if (!data) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if(IS_VAR_DATA_TYPE(pColumn->type)){ + data -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } + offset += TYPE_BYTES[pColumn->type]; + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(pSW->version); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htons(rows); + blk->dataLen = htonl(dataLen); + subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + subReq->numOfBlocks++; + } + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + + int32_t numOfVg = taosHashGetSize(pVgHash); + nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + + VgData **vData = (VgData **)taosHashIterate(pVgHash, NULL); + while (vData) { + SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = (*vData)->vg; + SSubmitReq* subReq = (SSubmitReq*)((*vData)->data); + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + (*vData)->data = NULL; // no need free + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + taosArrayPush(nodeStmt->pDataBlocks, &dst); + vData = (VgData **)taosHashIterate(pVgHash, vData); + } + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; +end: + qDestroyQuery(pQuery); + destroyRequest(pRequest); + taosHashCleanup(pVgHash); + return code; } void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {