提交 94a3fa4f 编写于 作者: wmmhello's avatar wmmhello

fix:htol(length) & set vgId,uid correct for auto create table in taosx

上级 d7e15881
......@@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed");
return code;
goto end;
}
launchQueryImpl(pRequest, pQuery, true, NULL);
......@@ -1538,6 +1538,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SMqTaosxRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SVCreateTbReq* pCreateReqDst = NULL;
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
......@@ -1605,17 +1606,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
strcpy(pName.tname, tbName);
// find schema data info
SVCreateTbReq pCreateReq = {0};
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
memset(&pCreateReq, 0, sizeof(SVCreateTbReq));
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
uError("WriteRaw: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
......@@ -1625,13 +1626,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
strcpy(pName.tname, pCreateReq.ctb.stbName);
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
tDecoderClear(&decoderTmp);
break;
}
tDecoderClear(&decoderTmp);
}
if(pCreateReqDst){
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
}else{
strcpy(pName.tname, tbName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
......@@ -1650,16 +1656,27 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if(pCreateReqDst){
pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid;
}
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0);
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
}
pCreateReqDst = NULL;
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed");
goto end;
}
launchQueryImpl(pRequest, pQuery, true, NULL);
......@@ -1672,6 +1689,10 @@ end:
destroyRequest(pRequest);
taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta);
if (pCreateReqDst) {
tdDestroySVCreateTbReq(pCreateReqDst);
taosMemoryFree(pCreateReqDst);
}
return code;
}
......
......@@ -683,6 +683,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId,
TD_VID(pTq->pVnode), req.subKey);
return -1;
}
if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
......
......@@ -34,7 +34,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
return 0;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
if (pSW == NULL) {
return -1;
......@@ -43,7 +43,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
......@@ -153,7 +153,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
continue;
}
} else {
......@@ -163,7 +163,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
tqAddBlockSchemaToRsp(pExec, pRsp);
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW);
......@@ -248,7 +248,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (pRsp->withTbName) {
/*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -364,9 +364,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
}
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) {
return -1;
}
......
......@@ -70,6 +70,7 @@ static void msg_process(TAOS_RES* msg) {
tmq_get_raw(msg, &raw);
printf("write raw data type: %d\n", raw.raw_type);
int32_t ret = tmq_write_raw(pConn, raw);
ASSERT(ret == 0);
printf("write raw data: %s\n", tmq_err2str(ret));
tmq_free_raw(raw);
taos_close(pConn);
......@@ -361,7 +362,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
pRes = taos_query(pConn, "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
"stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册