diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 6bd8b01842a35f93bc50ae718d47612aef034eb4..b1299c16e8129ca65fd7c4cba9cc2186e2360edf 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -183,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { } string = cJSON_PrintUnformatted(json); -end: + end: cJSON_Delete(json); tFreeSMAltertbReq(&req); return string; @@ -205,7 +205,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); uDebug("processCreateStb %s", string); -_err: + _err: tDecoderClear(&coder); return string; } @@ -227,7 +227,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); uDebug("processAlterStb %s", string); -_err: + _err: tDecoderClear(&coder); return string; } @@ -309,7 +309,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON_AddItemToArray(tags, tag); } -end: + end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); } @@ -368,7 +368,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { uDebug("processCreateTable :%s", string); } -_exit: + _exit: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -408,7 +408,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { } string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); uDebug("processAutoCreateTable :%s", string); -_exit: + _exit: for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); @@ -535,7 +535,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processAlterTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -569,7 +569,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDropSTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -609,7 +609,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDeleteTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&coder); return string; @@ -652,7 +652,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDropTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -742,7 +742,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); @@ -839,7 +839,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tDecoderClear(&coder); return code; @@ -901,9 +901,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table @@ -987,7 +987,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -1058,9 +1058,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { @@ -1132,7 +1132,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; -end: + end: taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1201,7 +1201,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); -end: + end: tDecoderClear(&coder); return code; } @@ -1249,9 +1249,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1311,7 +1311,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } -end: + end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); @@ -1399,7 +1399,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1481,7 +1481,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1601,6 +1601,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uError("WriteRaw:rawBlockBindData failed"); goto end; } + taosMemoryFreeClear(pTableMeta); } code = smlBuildOutput(pQuery, pVgHash); @@ -1612,7 +1613,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); @@ -1707,6 +1708,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); uError("WriteRaw: tDecodeSVCreateTbReq error"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -1715,15 +1717,19 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) if (pCreateReq.type != TSDB_CHILD_TABLE) { uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); code = TSDB_CODE_TSC_INVALID_VALUE; + tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); // pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); break; } tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); } SVgroupInfo vg; @@ -1774,6 +1780,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } pCreateReqDst = NULL; + taosMemoryFreeClear(pTableMeta); } code = smlBuildOutput(pQuery, pVgHash); @@ -1785,7 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSTaosxRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 2bb870837272a0422de500d56e89c7ca245a7beb..34808aa389b8da0cfcd10fc24edaec7c1b0a6a73 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2439,6 +2439,12 @@ _exit: int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data) { int32_t code = 0; + if(data == NULL){ + for (int32_t i = 0; i < nRows; ++i) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); + } + goto _exit; + } if (IS_VAR_DATA_TYPE(type)) { // var-length data type for (int32_t i = 0; i < nRows; ++i) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1d7c213e1aff8c59fe1860896269ee34bc14aa25..9037644602034819fd1da260c1cef02b7ae29a97 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -139,8 +139,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); -// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index de2732fcb550f1332c48ec58521bb708c8a33618..81fa058f24333a4be9f77e622550a273d03fb665 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -538,123 +538,133 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); - } else { // for taosX - // todo handle the case where re-balance occurs. - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - - if (offset.type != TMQ_OFFSET__LOG) { - if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { - return -1; - } + } - if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } + // todo handle the case where re-balance occurs. + // for taosx + SMqMetaRsp metaRsp = {0}; + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); - if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } else { - offset = taosxRsp.rspOffset; - } + if (offset.type != TMQ_OFFSET__LOG) { + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { + return -1; + } - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64, - consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, - taosxRsp.rspOffset.version); + if (metaRsp.metaRspLen > 0) { + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.ts); + taosMemoryFree(metaRsp.metaRsp); + tDeleteSTaosxRsp(&taosxRsp); + return code; } - if (offset.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = offset.version + 1; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + if (taosxRsp.blockNum > 0) { + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } else { + offset = taosxRsp.rspOffset; + } + + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, + consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, + taosxRsp.rspOffset.version); + } + + if (offset.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = offset.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { + tDeleteSTaosxRsp(&taosxRsp); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + walSetReaderCapacity(pHandle->pWalReader, 2048); + int totalRows = 0; + while (1) { + // todo refactor: this is not correct. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); + break; } - walSetReaderCapacity(pHandle->pWalReader, 2048); + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } - while (1) { - // todo refactor: this is not correct. - int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; - } + SWalCont* pHead = &pCkHead->head; + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, + pRequest->epoch, vgId, fetchVer, pHead->msgType); - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + // process meta + if (pHead->msgType != TDMT_VND_SUBMIT) { + if(totalRows > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; } - SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", - consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); - - if (pHead->msgType == TDMT_VND_SUBMIT) { - SPackedData submit = { - .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), - .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), - .ver = pHead->version, - }; - - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, - pRequest->subKey); - return -1; - } - - if (taosxRsp.blockNum > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; - } else { - fetchVer++; - } - - } else { - /*A(pHandle->fetchMeta);*/ - /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - code = 0; + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { + code = -1; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); return code; } + code = 0; + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return code; } - } - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return 0; + // process data + SPackedData submit = { + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), + .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), + .ver = pHead->version, + }; + + if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, + pRequest->subKey); + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return -1; + } + + if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } else { + fetchVer++; + } + } } + + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return 0; } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a62101eb4702442a580b2bfee33643106df41767..cbcd9180949f3e0a48c0c1149fe366a599a48a71 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -230,23 +230,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta return 0; } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) { STqExecHandle* pExec = &pHandle->execHandle; - /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/ - SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pSchemas = taosArrayInit(0, sizeof(void*)); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pExecReader; - /*tqReaderSetDataMsg(pReader, pReq, 0);*/ tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlock2(pReader)) { - /*SSDataBlock block = {0};*/ - /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ - /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ - /*}*/ - taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -254,7 +246,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - /*int64_t uid = pExec->pExecReader->msgIter.uid;*/ int64_t uid = pExec->pExecReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); @@ -296,6 +287,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); taosArrayPush(pRsp->blockSchema, &pSW); @@ -304,13 +296,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReader* pReader = pExec->pExecReader; - /*tqReaderSetDataMsg(pReader, pReq, 0);*/ tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { - /*SSDataBlock block = {0};*/ - /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ - /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ - /*}*/ taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -355,15 +342,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR tEncoderClear(&encoder); } - /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ - /*pTq->pVnode->config.tsdbCfg.precision);*/ - /*blockDataFreeRes(&block);*/ - /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/ - /*pRsp->blockNum++;*/ for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + *totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); taosArrayPush(pRsp->blockSchema, &pSW); @@ -373,9 +356,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } taosArrayDestroy(pBlocks); taosArrayDestroy(pSchemas); -// if (pRsp->blockNum == 0) { -// return -1; -// } - return 0; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 704f381afabeac392072ff9defc85963ac042244..4768231fb5fd362b5da4d2ae66af41ec20c19ecc 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -566,53 +566,19 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, return code; } -static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) { - bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); - if (NULL == pUseCols) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pBoundInfo->numOfBound = 0; - - int16_t lastColIdx = -1; // last column found - int32_t code = TSDB_CODE_SUCCESS; +static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) { for (int i = 0; i < numFields; i++) { - SToken token; - token.z = fields[i].name; - token.n = strlen(fields[i].name); - - int16_t t = lastColIdx + 1; - int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); - if (index < 0 && t > 0) { - index = insFindCol(&token, 0, t, pSchema); - } - if (index < 0) { - uError("can not find column name:%s", token.z); - code = TSDB_CODE_PAR_INVALID_COLUMN; - break; - } else if (pUseCols[index]) { - code = TSDB_CODE_PAR_INVALID_COLUMN; - uError("duplicated column name:%s", token.z); - break; - } else { - lastColIdx = index; - pUseCols[index] = true; - pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; - ++pBoundInfo->numOfBound; + if(strcmp(pSchema->name, fields[i].name) == 0){ + return true; } } - if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) { - uError("primary timestamp column can not be null:"); - code = TSDB_CODE_PAR_INVALID_COLUMN; - } - - taosMemoryFree(pUseCols); - return code; + return false; } int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int numFields, bool needChangeLength) { + void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); STableDataCxt* pTableCxt = NULL; int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); @@ -620,19 +586,14 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate uError("insGetTableDataCxt error"); goto end; } - if (tFields != NULL) { - ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields); + + if(tmp == NULL){ + ret = initTableColSubmitData(pTableCxt); if (ret != TSDB_CODE_SUCCESS) { - uError("bindFileds error"); + uError("initTableColSubmitData error"); goto end; } } - // no need to bind, because select * get all fields - ret = initTableColSubmitData(pTableCxt); - if (ret != TSDB_CODE_SUCCESS) { - uError("initTableColSubmitData error"); - goto end; - } char* p = (char*)data; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column @@ -660,35 +621,43 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; - if (boundInfo->numOfBound != numOfCols) { - uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); + if (numFields != numOfCols) { + uError("numFields:%d != numOfCols:%d", numFields, numOfCols); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } + if (numFields > boundInfo->numOfBound) { + uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound); ret = TSDB_CODE_INVALID_PARA; goto end; } for (int c = 0; c < boundInfo->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + SSchema* pColSchema = &pSchema[c]; SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); + if(findFileds(pColSchema, tFields, numFields)){ + if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + uError("type or bytes not equal"); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } - if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { - uError("type or bytes not equal"); - ret = TSDB_CODE_INVALID_PARA; - goto end; - } - - int8_t* offset = pStart; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - pStart += numOfRows * sizeof(int32_t); - } else { - pStart += BitmapLen(numOfRows); - } - char* pData = pStart; + int8_t* offset = pStart; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + pStart += numOfRows * sizeof(int32_t); + } else { + pStart += BitmapLen(numOfRows); + } + char* pData = pStart; - tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); - fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength) { - pStart += htonl(colLength[c]); - } else { - pStart += colLength[c]; + tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); + fields += sizeof(int8_t) + sizeof(int32_t); + if (needChangeLength) { + pStart += htonl(colLength[c]); + } else { + pStart += colLength[c]; + } + }else{ + tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL); } } diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py index 6c49fae299ac68aac99892acf5de3011df0ab1a0..c8bcdd6235782f4a29cc18a4f243e8e53ed5ffe5 100644 --- a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py @@ -100,7 +100,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") @@ -131,7 +131,7 @@ class TDTestCase: 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, - 'pollDelay': 10, + 'pollDelay': 20, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -193,7 +193,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....")