提交 4cc2d361 编写于 作者: wmmhello's avatar wmmhello

fix:error in getting snapshot for taosX

上级 c7bf08d5
......@@ -226,19 +226,19 @@ int32_t init_env() {
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
......@@ -289,12 +289,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
......@@ -326,12 +326,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
......@@ -386,7 +386,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
......
......@@ -1405,6 +1405,11 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST){
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
code = TSDB_CODE_SUCCESS;
continue;
}
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
goto end;
......@@ -1472,7 +1477,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
SRowBuilder rb = {0};
tdSRowInit(&rb, sver);
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
int32_t dataLen = 0;
int32_t totalLen = 0;
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < pSW->nCols; i++) {
......@@ -1509,7 +1514,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
totalLen += rowLen;
}
taosHashCleanup(schemaHash);
......@@ -1518,8 +1523,8 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk->sversion = htonl(sver);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blk->dataLen = htonl(totalLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
subReq->numOfBlocks++;
taosMemoryFreeClear(pTableMeta);
}
......
此差异已折叠。
......@@ -283,6 +283,15 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
STbDbKey *tmp = (STbDbKey*)pKey;
if (tmp->version > ctx->snapVersion) break;
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
if(idData) {
continue;
}
if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) < 0) { // check if table exist for now, need optimize later
continue;
}
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
......@@ -290,16 +299,17 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
if(ctx->subType == TOPIC_SUB_TYPE__TABLE){
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){
tDecoderClear(&dc);
continue;
}
}
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
if(!idData){
taosArrayPush(ctx->idList, &tmp->uid);
metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
SIdInfo info = {0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
}
taosArrayPush(ctx->idList, &tmp->uid);
metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
SIdInfo info = {0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
tDecoderClear(&dc);
}
taosHashClear(ctx->idVersion);
......@@ -310,10 +320,11 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
STbDbKey *tmp = (STbDbKey*)pKey;
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
if(!idData){
SIdInfo info = {.version = tmp->version, .index = 0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
if(idData){
continue;
}
SIdInfo info = {.version = tmp->version, .index = 0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
SDecoder dc = {0};
SMetaEntry me = {0};
......@@ -322,6 +333,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
if(ctx->subType == TOPIC_SUB_TYPE__TABLE){
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){
tDecoderClear(&dc);
continue;
}
}
......
......@@ -403,6 +403,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}else{
fetchOffsetNew = dataRsp.rspOffset;
}
......@@ -434,7 +435,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO add push mgr
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
......@@ -456,8 +456,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO continue scan until meeting batch requirement
if (dataRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
......
......@@ -120,36 +120,51 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
}
qStreamExtractOffset(task, &pRsp->rspOffset);
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
break;
}
}else{
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
if(qStreamExtractPrepareUid(task) != 0){
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
break;
}
if (pRsp->blockNum > 0){
tqDebug("tmqsnap task exec exited, get data");
break;
}
if (pRsp->blockNum > 0){
tqDebug("tmqsnap task exec exited, get data");
break;
}
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("tmqsnap task exec change to get data");
continue;
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("tmqsnap task exec change to get data");
continue;
}
*pMetaRsp = *tmp;
tqDebug("tmqsnap task exec exited, get meta");
}
*pMetaRsp = *tmp;
tqDebug("tmqsnap task exec exited, get meta");
tqDebug("task exec exited");
break;
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
ASSERT(0);
}
ASSERT(pRsp->rspOffset.type != 0);
return 0;
}
......
......@@ -847,6 +847,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
if(mtInfo.uid == 0) return 0; // no data
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
......
......@@ -1528,7 +1528,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
SSDataBlock* pBlock = &pInfo->pRes;
if (tsdbNextDataBlock(pInfo->dataReader)) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册