提交 c381ffd3 编写于 作者: wmmhello's avatar wmmhello

fix:error in tmq for snapshot

上级 507adf9e
......@@ -219,19 +219,19 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
......@@ -282,12 +282,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
......@@ -319,12 +319,12 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
taos_close(pConn);
return 0;
......
......@@ -189,6 +189,12 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
......
......@@ -156,8 +156,14 @@ typedef struct SMetaTableInfo{
int64_t suid;
int64_t uid;
SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN];
}SMetaTableInfo;
typedef struct SIdInfo{
int64_t version;
int32_t index;
}SIdInfo;
typedef struct SSnapContext {
SMeta *pMeta;
int64_t snapVersion;
......@@ -166,6 +172,8 @@ typedef struct SSnapContext {
int8_t subType;
SHashObj *idVersion;
SHashObj *suidInfo;
SArray *idList;
int32_t index;
bool withMeta;
bool queryMetaOrData; // true-get meta, false-get data
}SSnapContext;
......
......@@ -200,21 +200,59 @@ typedef struct STableInfoForChildTable{
char *tableName;
SArray *tagName;
SSchemaWrapper *schemaRow;
SSchemaWrapper *tagRow;
}STableInfoForChildTable;
static void destroySTableInfoForChildTable(void* data) {
STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
taosMemoryFree(pData->tagName);
taosMemoryFree(pData->tableName);
taosArrayDestroy(pData->tagName);
tDeleteSSchemaWrapper(pData->schemaRow);
}
static void clearAndMoveToFirst(SSnapContext* ctx){
static void MoveToSnapShotVersion(SSnapContext* ctx){
tdbTbcClose(ctx->pCur);
tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
int c = 0;
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
if(c < 0){
tdbTbcMoveToPrev(ctx->pCur);
}
}
static void MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid){
tdbTbcClose(ctx->pCur);
tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
STbDbKey key = {.version = ver, .uid = uid};
int c = 0;
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
ASSERT(c == 0);
}
static void MoveToFirst(SSnapContext* ctx){
tdbTbcClose(ctx->pCur);
tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
tdbTbcMoveToFirst(ctx->pCur);
}
static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
if(data){
return;
}
STableInfoForChildTable dataTmp = {0};
dataTmp.tableName = strdup(me->name);
dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN);
for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){
SSchema *schema = &me->stbEntry.schemaTag.pSchema[i];
taosArrayPush(dataTmp.tagName, schema->name);
}
dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if(ctx == NULL) return -1;
......@@ -225,10 +263,6 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
ctx->subType = subType;
ctx->queryMetaOrData = withMeta;
ctx->withMeta = withMeta;
int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL);
if (ret < 0) {
return -1;
}
ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if(ctx->idVersion == NULL){
return -1;
......@@ -240,25 +274,84 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
}
taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
ctx->index = 0;
ctx->idList = taosArrayInit(100, sizeof(int64_t));
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int vLen = 0, kLen = 0;
tdbTbcMoveToFirst(ctx->pCur);
metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
MoveToFirst(ctx);
while(1){
int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
STbDbKey *tmp = (STbDbKey*)pKey;
if (tmp->version > ctx->snapVersion) break;
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
if(ctx->subType == TOPIC_SUB_TYPE__TABLE){
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){
continue;
}
}
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
if(!idData){
taosArrayPush(ctx->idList, &tmp->uid);
metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
SIdInfo info = {0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
}
}
taosHashClear(ctx->idVersion);
MoveToSnapShotVersion(ctx);
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) break;
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t));
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
if(!idData){
SIdInfo info = {.version = tmp->version, .index = 0};
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
}
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
if(ctx->subType == TOPIC_SUB_TYPE__TABLE){
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){
continue;
}
}
if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE)
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
}
tDecoderClear(&dc);
}
for(int i = 0; i < taosArrayGetSize(ctx->idList); i++){
int64_t *uid = taosArrayGet(ctx->idList, i);
SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
ASSERT(idData);
idData->index = i;
metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index);
}
clearAndMoveToFirst(ctx);
return TDB_CODE_SUCCESS;
}
int32_t destroySnapContext(SSnapContext* ctx){
tdbTbcClose(ctx->pCur);
taosArrayDestroy(ctx->idList);
taosHashCleanup(ctx->idVersion);
taosHashCleanup(ctx->suidInfo);
taosMemoryFree(ctx);
......@@ -327,42 +420,20 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co
return 0;
}
static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){
STableInfoForChildTable dataTmp = {0};
dataTmp.tableName = strdup(me->name);
dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN);
for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){
SSchema *schema = &me->stbEntry.schemaTag.pSchema[i];
taosArrayPush(dataTmp.tagName, schema->name);
}
dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
if(data){
destroySTableInfoForChildTable(data);
}
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){
int c = 0;
if(uid == -1){
return c;
}
if(uid == 0){
clearAndMoveToFirst(ctx);
ctx->index = 0;
return c;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
if(!ver){
SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
if(!idInfo){
return -1;
}
STbDbKey key = {.version = *ver, .uid = uid};
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
ctx->index = idInfo->index;
return c;
}
......@@ -371,57 +442,46 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
int32_t ret = 0;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int vLen = 0, kLen = 0;
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) {
if(ctx->index >= taosArrayGetSize(ctx->idList)){
metaDebug("tmqsnap get meta end");
ctx->index = 0;
ctx->queryMetaOrData = false; // change to get data
clearAndMoveToFirst(ctx);
return 0;
}
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) {
clearAndMoveToFirst(ctx);
ctx->queryMetaOrData = false; // change to get data
return 0;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
ASSERT(ver);
if(*ver > tmp->version){
continue;
}
ASSERT(*ver == tmp->version);
int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
ctx->index++;
SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
ASSERT(idInfo);
*uid = tmp->uid;
*uid = *uidTmp;
MoveToPosition(ctx, idInfo->version, *uidTmp);
tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index-1);
if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE)
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
SVCreateStbReq req = {0};
req.name = me.name;
req.suid = me.uid;
req.schemaRow = me.stbEntry.schemaRow;
req.schemaTag = me.stbEntry.schemaTag;
req.schemaRow.version = 1;
req.schemaTag.version = 1;
ret = buildSuperTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_STB;
break;
} else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE)
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
if(!data){ // if table has been deleted
tDecoderClear(&dc);
continue;
}
ASSERT(data);
SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
......@@ -431,12 +491,26 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
req.ctb.suid = me.ctbEntry.suid;
req.ctb.tagNum = taosArrayGetSize(data->tagName);
req.ctb.name = data->tableName;
// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
// if(sidInfo->version >= idInfo->version){
// // need parse tag
// STag* p = (STag*)me.ctbEntry.pTags;
// SArray* pTagVals = NULL;
// if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
// }
//
// int16_t nCols = taosArrayGetSize(pTagVals);
// for (int j = 0; j < nCols; ++j) {
// STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// }
// }else{
req.ctb.pTag = me.ctbEntry.pTags;
// }
req.ctb.tagName = data->tagName;
ret = buildNormalChildTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_TABLE;
break;
} else if(ctx->subType == TOPIC_SUB_TYPE__DB){
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
......@@ -445,68 +519,64 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
req.commentLen = -1;
req.ntb.schemaRow = me.ntbEntry.schemaRow;
ret = buildNormalChildTableInfo(&req, pBuf, contLen);
tDecoderClear(&dc);
*type = TDMT_VND_CREATE_TABLE;
break;
} else{
tDecoderClear(&dc);
continue;
}
ASSERT(0);
}
tDecoderClear(&dc);
return ret;
}
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){
SMetaTableInfo result = {0};
int32_t ret = 0;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
while(1){
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) {
if(ctx->index >= taosArrayGetSize(ctx->idList)){
metaDebug("tmqsnap get uid info end");
return result;
}
int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
ctx->index++;
SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
ASSERT(idInfo);
STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) {
return result;
}
int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
ASSERT(ver);
if(*ver > tmp->version){
continue;
}
ASSERT(*ver == tmp->version);
MoveToPosition(ctx, idInfo->version, *uidTmp);
tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen);
metaDecodeEntry(&dc, &me);
metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index-1);
if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE){
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid;
result.suid = me.ctbEntry.suid;
result.schema = data->schemaRow;
result.schema = tCloneSSchemaWrapper(data->schemaRow);
strcpy(result.tbName, me.name);
tDecoderClear(&dc);
break;
} else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
result.uid = me.uid;
result.suid = 0;
result.schema = &me.ntbEntry.schemaRow;
strcpy(result.tbName, me.name);
result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
tDecoderClear(&dc);
break;
} else if(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
result.uid = me.uid;
result.suid = me.ctbEntry.suid;
result.schema = data->schemaRow;
strcpy(result.tbName, me.name);
result.schema = tCloneSSchemaWrapper(data->schemaRow);
tDecoderClear(&dc);
break;
} else{
metaDebug("tmqsnap get uid continue");
tDecoderClear(&dc);
continue;
}
......
......@@ -464,6 +464,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
if (pRef == NULL) {
......@@ -480,10 +481,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true,
.version = ver,
};
pHandle->snapshotVer = ver;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver;
req.qmsg = NULL;
pHandle->execHandle.task =
......
......@@ -83,26 +83,32 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("task start to execute");
tqDebug("tmqsnap task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
tqDebug("task execute end, get %p", pDataBlock);
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) {
if (pRsp->withTbName) {
int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) {
uid = pExec->pExecReader->msgIter.uid;
} else {
uid = pDataBlock->info.uid;
}
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} else {
char* tbName = strdup(qExtractTbnameFromTask(task));
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
if(pRsp->withSchema){
if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, pRsp);
}else{
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW);
}
}
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
pRsp->blockNum++;
......@@ -114,8 +120,11 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
}
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
......@@ -124,7 +133,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
if (pRsp->blockNum > 0){
qStreamExtractOffset(task, &pRsp->rspOffset);
tqDebug("task exec exited, get data");
tqDebug("tmqsnap task exec exited, get data");
break;
}
......@@ -132,12 +141,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("task exec change to get meta");
tqDebug("tmqsnap task exec change to get data");
continue;
}
*pMetaRsp = *tmp;
tqDebug("task exec exited, get meta");
tqDebug("tmqsnap task exec exited, get meta");
break;
}
......
......@@ -150,6 +150,8 @@ typedef struct {
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
......
......@@ -605,9 +605,14 @@ void* qExtractReaderFromStreamScanner(void* scanner) {
return (void*)pInfo->tqReader;
}
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return pInfo->tqReader->pSchemaWrapper;
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.schema;
}
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.tbName;
}
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
......@@ -616,6 +621,12 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
return &pTaskInfo->streamInfo.metaRsp;
}
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
return pTaskInfo->streamInfo.prepareStatus.uid;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
......@@ -752,14 +763,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}
SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
if(pOffset->uid == 0) pOffset->uid = mtInfo.uid;
if(pOffset->ts == 0) pOffset->ts = INT64_MIN;
if (pOffset->uid == 0) {
qError("setDataForSnapShot error. uid = 0 ");
return -1;
}
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
......@@ -770,7 +773,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
......@@ -778,10 +784,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid);
return -1;
}
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid);
}else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log");
}
return 0;
}
......@@ -1466,11 +1466,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("stream scan called");
qDebug("doRawScan called");
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
SSDataBlock* pBlock = &pInfo->pRes;
while (tsdbNextDataBlock(pInfo->dataReader)) {
if (tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -1483,23 +1483,31 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
pBlock->pDataBlock = pCols;
if (pCols == NULL) {
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
return NULL;
}else{
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
continue;
}
longjmp(pTaskInfo->env, terrno);
}
qDebug("tmqsnap doRawScan get data uid:%ld", pBlock->info.uid);
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
return pBlock;
}
qDebug("stream scan tsdb return null");
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
}else{
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
}
qDebug("tmqsnap stream scan tsdb return null");
return NULL;
}else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){
SSnapContext *sContext = pInfo->sContext;
......@@ -1508,7 +1516,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
int16_t type = 0;
int64_t uid = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){
qError("getMetafromSnapShot error");
qError("tmqsnap getMetafromSnapShot error");
taosMemoryFreeClear(data);
return NULL;
}
......@@ -1518,7 +1526,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
}else{
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
......@@ -1542,7 +1550,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL;
}
SWalCont* pHead = &pInfo->pCkHead->head;
qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
......@@ -1550,6 +1558,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = fetchVer;
pInfo->hasDataInOneFetchVer = false;
pInfo->pRes.pDataBlock = NULL;
}
}
......@@ -1559,7 +1568,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
blockDataFreeRes(&pInfo->pRes);
SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
if(block){
qDebug("fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
pInfo->needFetchLog = false;
pInfo->hasDataInOneFetchVer = true;
return block;
......@@ -1574,7 +1583,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
} else if(pInfo->sContext->withMeta){
ASSERT(IS_META_MSG(pHead->msgType));
qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
......
......@@ -415,7 +415,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS;
}
int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
*pDst = taosMemoryMalloc(metaSize);
if (NULL == *pDst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册