diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c index 7ef6415743be2222872473d4dd6c8fa506a319c1..fd6bfc6812223db54a09767736d36fdd275d3f78 100644 --- a/examples/c/tmq_taosx.c +++ b/examples/c/tmq_taosx.c @@ -219,19 +219,19 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop table ct3 ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop table ct3 ct1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) { @@ -282,12 +282,12 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop table n1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop table n1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) { @@ -319,12 +319,12 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); taos_close(pConn); return 0; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 9148783d9933feb018caa05dea27c327e2a227e0..8088912e9a58e92fd51a37e252adc87708a89a6e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -189,6 +189,12 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); +int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo); + +const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); + +const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); + void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index eb4c87ad31188a28fc372a9100a05e445ec2e7d2..b280110f932c3951e9dea8ac9f4aba12f3fab7e4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -156,8 +156,14 @@ typedef struct SMetaTableInfo{ int64_t suid; int64_t uid; SSchemaWrapper *schema; + char tbName[TSDB_TABLE_NAME_LEN]; }SMetaTableInfo; +typedef struct SIdInfo{ + int64_t version; + int32_t index; +}SIdInfo; + typedef struct SSnapContext { SMeta *pMeta; int64_t snapVersion; @@ -166,6 +172,8 @@ typedef struct SSnapContext { int8_t subType; SHashObj *idVersion; SHashObj *suidInfo; + SArray *idList; + int32_t index; bool withMeta; bool queryMetaOrData; // true-get meta, false-get data }SSnapContext; diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 064bad10a990004ca22290a5ef63e3db32a8794a..89673a97cad068de3442f5463062555d47e63a00 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -200,21 +200,59 @@ typedef struct STableInfoForChildTable{ char *tableName; SArray *tagName; SSchemaWrapper *schemaRow; + SSchemaWrapper *tagRow; }STableInfoForChildTable; static void destroySTableInfoForChildTable(void* data) { STableInfoForChildTable* pData = (STableInfoForChildTable*)data; - taosMemoryFree(pData->tagName); + taosMemoryFree(pData->tableName); taosArrayDestroy(pData->tagName); tDeleteSSchemaWrapper(pData->schemaRow); } -static void clearAndMoveToFirst(SSnapContext* ctx){ +static void MoveToSnapShotVersion(SSnapContext* ctx){ + tdbTbcClose(ctx->pCur); + tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); + STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX}; + int c = 0; + tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); + if(c < 0){ + tdbTbcMoveToPrev(ctx->pCur); + } +} + +static void MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid){ + tdbTbcClose(ctx->pCur); + tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); + STbDbKey key = {.version = ver, .uid = uid}; + int c = 0; + tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); + ASSERT(c == 0); +} + +static void MoveToFirst(SSnapContext* ctx){ tdbTbcClose(ctx->pCur); tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL); tdbTbcMoveToFirst(ctx->pCur); } +static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t)); + if(data){ + return; + } + STableInfoForChildTable dataTmp = {0}; + dataTmp.tableName = strdup(me->name); + dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN); + for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){ + SSchema *schema = &me->stbEntry.schemaTag.pSchema[i]; + taosArrayPush(dataTmp.tagName, schema->name); + } + dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow); + dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag); + taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); +} + int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){ SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); if(ctx == NULL) return -1; @@ -225,10 +263,6 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t ctx->subType = subType; ctx->queryMetaOrData = withMeta; ctx->withMeta = withMeta; - int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL); - if (ret < 0) { - return -1; - } ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if(ctx->idVersion == NULL){ return -1; @@ -240,25 +274,84 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t } taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable); + ctx->index = 0; + ctx->idList = taosArrayInit(100, sizeof(int64_t)); void *pKey = NULL; void *pVal = NULL; - int vLen, kLen; + int vLen = 0, kLen = 0; - tdbTbcMoveToFirst(ctx->pCur); + metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion); + MoveToFirst(ctx); + while(1){ + int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + STbDbKey *tmp = (STbDbKey*)pKey; + if (tmp->version > ctx->snapVersion) break; + + SDecoder dc = {0}; + SMetaEntry me = {0}; + tDecoderInit(&dc, pVal, vLen); + metaDecodeEntry(&dc, &me); + if(ctx->subType == TOPIC_SUB_TYPE__TABLE){ + if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) || + (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){ + continue; + } + } + SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); + if(!idData){ + taosArrayPush(ctx->idList, &tmp->uid); + metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid); + SIdInfo info = {0}; + taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)); + } + } + taosHashClear(ctx->idVersion); + + MoveToSnapShotVersion(ctx); while(1){ - ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); + int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen); if (ret < 0) break; STbDbKey *tmp = (STbDbKey*)pKey; - if(tmp->version > ctx->snapVersion) break; - taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t)); + SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); + if(!idData){ + SIdInfo info = {.version = tmp->version, .index = 0}; + taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)); + } + + SDecoder dc = {0}; + SMetaEntry me = {0}; + tDecoderInit(&dc, pVal, vLen); + metaDecodeEntry(&dc, &me); + if(ctx->subType == TOPIC_SUB_TYPE__TABLE){ + if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) || + (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)){ + continue; + } + } + + if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) + || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { + saveSuperTableInfoForChildTable(&me, ctx->suidInfo); + } + tDecoderClear(&dc); + } + + for(int i = 0; i < taosArrayGetSize(ctx->idList); i++){ + int64_t *uid = taosArrayGet(ctx->idList, i); + SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t)); + ASSERT(idData); + idData->index = i; + metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); } - clearAndMoveToFirst(ctx); + return TDB_CODE_SUCCESS; } int32_t destroySnapContext(SSnapContext* ctx){ tdbTbcClose(ctx->pCur); + taosArrayDestroy(ctx->idList); taosHashCleanup(ctx->idVersion); taosHashCleanup(ctx->suidInfo); taosMemoryFree(ctx); @@ -327,42 +420,20 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co return 0; } -static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ - STableInfoForChildTable dataTmp = {0}; - dataTmp.tableName = strdup(me->name); - dataTmp.tagName = taosArrayInit(me->stbEntry.schemaTag.nCols, TSDB_COL_NAME_LEN); - for(int i = 0; i < me->stbEntry.schemaTag.nCols; i++){ - SSchema *schema = &me->stbEntry.schemaTag.pSchema[i]; - taosArrayPush(dataTmp.tagName, schema->name); - } - dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow); - - STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t)); - if(data){ - destroySTableInfoForChildTable(data); - } - taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); -} - int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){ int c = 0; - if(uid == -1){ - return c; - } - if(uid == 0){ - clearAndMoveToFirst(ctx); + ctx->index = 0; return c; } - int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t)); - if(!ver){ + SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t)); + if(!idInfo){ return -1; } - STbDbKey key = {.version = *ver, .uid = uid}; - tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c); + ctx->index = idInfo->index; return c; } @@ -371,142 +442,141 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in int32_t ret = 0; void *pKey = NULL; void *pVal = NULL; - int vLen, kLen; - - while(1){ - ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); - if (ret < 0) { - ctx->queryMetaOrData = false; // change to get data - clearAndMoveToFirst(ctx); - return 0; - } - - STbDbKey *tmp = (STbDbKey*)pKey; - if(tmp->version > ctx->snapVersion) { - clearAndMoveToFirst(ctx); - ctx->queryMetaOrData = false; // change to get data - return 0; - } - int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); - ASSERT(ver); - if(*ver > tmp->version){ - continue; - } - ASSERT(*ver == tmp->version); - - *uid = tmp->uid; - SDecoder dc = {0}; - SMetaEntry me = {0}; - tDecoderInit(&dc, pVal, vLen); - metaDecodeEntry(&dc, &me); - - if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) - || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { - saveSuperTableInfoForChildTable(&me, ctx->suidInfo); + int vLen = 0, kLen = 0; - SVCreateStbReq req = {0}; - req.name = me.name; - req.suid = me.uid; - req.schemaRow = me.stbEntry.schemaRow; - req.schemaTag = me.stbEntry.schemaTag; - - ret = buildSuperTableInfo(&req, pBuf, contLen); - tDecoderClear(&dc); - *type = TDMT_VND_CREATE_STB; - break; - } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) - || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { + if(ctx->index >= taosArrayGetSize(ctx->idList)){ + metaDebug("tmqsnap get meta end"); + ctx->index = 0; + ctx->queryMetaOrData = false; // change to get data + return 0; + } - STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); - if(!data){ // if table has been deleted - tDecoderClear(&dc); - continue; - } - SVCreateTbReq req = {0}; - - req.type = TD_CHILD_TABLE; - req.name = me.name; - req.uid = me.uid; - req.commentLen = -1; - req.ctb.suid = me.ctbEntry.suid; - req.ctb.tagNum = taosArrayGetSize(data->tagName); - req.ctb.name = data->tableName; + int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index); + ctx->index++; + SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t)); + ASSERT(idInfo); + + *uid = *uidTmp; + MoveToPosition(ctx, idInfo->version, *uidTmp); + tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen); + SDecoder dc = {0}; + SMetaEntry me = {0}; + tDecoderInit(&dc, pVal, vLen); + metaDecodeEntry(&dc, &me); + metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index-1); + + if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) + || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) { + SVCreateStbReq req = {0}; + req.name = me.name; + req.suid = me.uid; + req.schemaRow = me.stbEntry.schemaRow; + req.schemaTag = me.stbEntry.schemaTag; + req.schemaRow.version = 1; + req.schemaTag.version = 1; + + ret = buildSuperTableInfo(&req, pBuf, contLen); + *type = TDMT_VND_CREATE_STB; + + } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) + || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { + STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); + ASSERT(data); + SVCreateTbReq req = {0}; + + req.type = TD_CHILD_TABLE; + req.name = me.name; + req.uid = me.uid; + req.commentLen = -1; + req.ctb.suid = me.ctbEntry.suid; + req.ctb.tagNum = taosArrayGetSize(data->tagName); + req.ctb.name = data->tableName; + +// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); +// if(sidInfo->version >= idInfo->version){ +// // need parse tag +// STag* p = (STag*)me.ctbEntry.pTags; +// SArray* pTagVals = NULL; +// if (tTagToValArray((const STag*)p, &pTagVals) != 0) { +// } +// +// int16_t nCols = taosArrayGetSize(pTagVals); +// for (int j = 0; j < nCols; ++j) { +// STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); +// } +// }else{ req.ctb.pTag = me.ctbEntry.pTags; - req.ctb.tagName = data->tagName; - ret = buildNormalChildTableInfo(&req, pBuf, contLen); - tDecoderClear(&dc); - *type = TDMT_VND_CREATE_TABLE; - break; - } else if(ctx->subType == TOPIC_SUB_TYPE__DB){ - SVCreateTbReq req = {0}; - req.type = TD_NORMAL_TABLE; - req.name = me.name; - req.uid = me.uid; - req.commentLen = -1; - req.ntb.schemaRow = me.ntbEntry.schemaRow; - ret = buildNormalChildTableInfo(&req, pBuf, contLen); - tDecoderClear(&dc); - *type = TDMT_VND_CREATE_TABLE; - break; - } else{ - tDecoderClear(&dc); - continue; - } +// } + + req.ctb.tagName = data->tagName; + ret = buildNormalChildTableInfo(&req, pBuf, contLen); + *type = TDMT_VND_CREATE_TABLE; + } else if(ctx->subType == TOPIC_SUB_TYPE__DB){ + SVCreateTbReq req = {0}; + req.type = TD_NORMAL_TABLE; + req.name = me.name; + req.uid = me.uid; + req.commentLen = -1; + req.ntb.schemaRow = me.ntbEntry.schemaRow; + ret = buildNormalChildTableInfo(&req, pBuf, contLen); + *type = TDMT_VND_CREATE_TABLE; + } else{ + ASSERT(0); } + tDecoderClear(&dc); return ret; } SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx){ SMetaTableInfo result = {0}; - int32_t ret = 0; void *pKey = NULL; void *pVal = NULL; int vLen, kLen; while(1){ - ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); - if (ret < 0) { + if(ctx->index >= taosArrayGetSize(ctx->idList)){ + metaDebug("tmqsnap get uid info end"); return result; } + int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index); + ctx->index++; + SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t)); + ASSERT(idInfo); - STbDbKey *tmp = (STbDbKey*)pKey; - if(tmp->version > ctx->snapVersion) { - return result; - } - int64_t* ver = (int64_t*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t)); - ASSERT(ver); - if(*ver > tmp->version){ - continue; - } - ASSERT(*ver == tmp->version); - + MoveToPosition(ctx, idInfo->version, *uidTmp); + tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen); SDecoder dc = {0}; SMetaEntry me = {0}; tDecoderInit(&dc, pVal, vLen); metaDecodeEntry(&dc, &me); + metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index-1); if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE){ STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); result.uid = me.uid; result.suid = me.ctbEntry.suid; - result.schema = data->schemaRow; + result.schema = tCloneSSchemaWrapper(data->schemaRow); + strcpy(result.tbName, me.name); tDecoderClear(&dc); break; } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) { result.uid = me.uid; result.suid = 0; - result.schema = &me.ntbEntry.schemaRow; + strcpy(result.tbName, me.name); + result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); tDecoderClear(&dc); break; } else if(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) { STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); result.uid = me.uid; result.suid = me.ctbEntry.suid; - result.schema = data->schemaRow; + strcpy(result.tbName, me.name); + result.schema = tCloneSSchemaWrapper(data->schemaRow); tDecoderClear(&dc); break; } else{ + metaDebug("tmqsnap get uid continue"); tDecoderClear(&dc); continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76dffab3193bc7d4a04d9f3e12c6c7226c7f1978..d2dd53fcdf090ffa9a02fa9b7829203f78a072c3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -464,6 +464,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.subType = req.subType; pHandle->fetchMeta = req.withMeta; + // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { @@ -480,10 +481,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; + pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; - pHandle->snapshotVer = ver; req.qmsg = NULL; pHandle->execHandle.task = diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index ccf1c3057979664d38e85c02af8bc188b19c11ae..afa7b943901366ab26b295b7d149e0e145aac0b4 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -83,26 +83,32 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("task start to execute"); + tqDebug("tmqsnap task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } - tqDebug("task execute end, get %p", pDataBlock); + tqDebug("tmqsnap task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { if (pRsp->withTbName) { int64_t uid = 0; if (pOffset->type == TMQ_OFFSET__LOG) { uid = pExec->pExecReader->msgIter.uid; + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } else { - uid = pDataBlock->info.uid; - } - if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { - continue; + char* tbName = strdup(qExtractTbnameFromTask(task)); + taosArrayPush(pRsp->blockTbName, &tbName); } } if(pRsp->withSchema){ - tqAddBlockSchemaToRsp(pExec, pRsp); + if (pOffset->type == TMQ_OFFSET__LOG) { + tqAddBlockSchemaToRsp(pExec, pRsp); + }else{ + SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); + taosArrayPush(pRsp->blockSchema, &pSW); + } } tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); pRsp->blockNum++; @@ -114,8 +120,11 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } } - if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), + if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ + if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && qStreamExtractPrepareUid(task) != 0){ + continue; + } + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); tqOffsetResetToLog(pOffset, pHandle->snapshotVer); qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); @@ -124,7 +133,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if (pRsp->blockNum > 0){ qStreamExtractOffset(task, &pRsp->rspOffset); - tqDebug("task exec exited, get data"); + tqDebug("tmqsnap task exec exited, get data"); break; } @@ -132,12 +141,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType); tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; - tqDebug("task exec change to get meta"); + tqDebug("tmqsnap task exec change to get data"); continue; } *pMetaRsp = *tmp; - tqDebug("task exec exited, get meta"); + tqDebug("tmqsnap task exec exited, get meta"); break; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 54498ebc85b1aa0dd65fa9492688b762fea4527a..18756664da52b632d4eff0443ba5d7cd4ab26708 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,6 +150,8 @@ typedef struct { STqOffsetVal prepareStatus; // for tmq STqOffsetVal lastStatus; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta + SSchemaWrapper *schema; + char tbName[TSDB_TABLE_NAME_LEN]; SSDataBlock* pullOverBlk; // for streaming SWalFilterCond cond; int64_t lastScanUid; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b620e92766e253ac66cdf44003f7c65d3252cc6b..018553e7dc7f55c0b480d90a568b4877c544ab3f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -605,9 +605,14 @@ void* qExtractReaderFromStreamScanner(void* scanner) { return (void*)pInfo->tqReader; } -const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { - SStreamScanInfo* pInfo = scanner; - return pInfo->tqReader->pSchemaWrapper; +const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.schema; +} + +const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.tbName; } SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { @@ -616,6 +621,12 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { return &pTaskInfo->streamInfo.metaRsp; } +int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); + return pTaskInfo->streamInfo.prepareStatus.uid; +} + int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); @@ -752,14 +763,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); - if(pOffset->uid == 0) pOffset->uid = mtInfo.uid; - if(pOffset->ts == 0) pOffset->ts = INT64_MIN; - - if (pOffset->uid == 0) { - qError("setDataForSnapShot error. uid = 0 "); - return -1; - } - tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); @@ -770,7 +773,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); - qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts); + strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + pTaskInfo->streamInfo.schema = mtInfo.schema; + qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts); }else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -778,10 +784,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid); return -1; } + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid); }else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; + qDebug("tmqsnap qStreamPrepareScan snapshot log"); } return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 73745dd8a9c48fab26fa8d1a84cb5fdc1e41b131..a2c96392caec0e449f60fd39f4d958f0e6216986 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1466,11 +1466,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; - qDebug("stream scan called"); + qDebug("doRawScan called"); if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ SSDataBlock* pBlock = &pInfo->pRes; - while (tsdbNextDataBlock(pInfo->dataReader)) { + if (tsdbNextDataBlock(pInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -1483,23 +1483,31 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); pBlock->pDataBlock = pCols; if (pCols == NULL) { - SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); - if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal - return NULL; - }else{ - pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; - qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); - continue; - } + longjmp(pTaskInfo->env, terrno); } + qDebug("tmqsnap doRawScan get data uid:%ld", pBlock->info.uid); pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; - return pBlock; } - qDebug("stream scan tsdb return null"); + + SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); + if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal + qDebug("tmqsnap read snapshot done, change to get data from wal"); + pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + }else{ + pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; + pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN; + qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); + qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); + strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + pTaskInfo->streamInfo.schema = mtInfo.schema; + } + qDebug("tmqsnap stream scan tsdb return null"); return NULL; }else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){ SSnapContext *sContext = pInfo->sContext; @@ -1508,7 +1516,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { int16_t type = 0; int64_t uid = 0; if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){ - qError("getMetafromSnapShot error"); + qError("tmqsnap getMetafromSnapShot error"); taosMemoryFreeClear(data); return NULL; } @@ -1518,7 +1526,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; - pTaskInfo->streamInfo.metaRsp.rspOffset.ts = 0; + pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN; }else{ pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.uid = uid; @@ -1542,7 +1550,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } SWalCont* pHead = &pInfo->pCkHead->head; - qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); + qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; @@ -1550,6 +1558,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; pTaskInfo->streamInfo.lastStatus.version = fetchVer; pInfo->hasDataInOneFetchVer = false; + pInfo->pRes.pDataBlock = NULL; } } @@ -1559,7 +1568,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { blockDataFreeRes(&pInfo->pRes); SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); if(block){ - qDebug("fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); pInfo->needFetchLog = false; pInfo->hasDataInOneFetchVer = true; return block; @@ -1574,7 +1583,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } } else if(pInfo->sContext->withMeta){ ASSERT(IS_META_MSG(pHead->msgType)); - qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4cad6a078b02e34777e052248a429a3626a8b81e..41333e77566f26b34e3a37f7181c59e129fd4d0e 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -415,7 +415,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { return TSDB_CODE_SUCCESS; } - int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); + int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); *pDst = taosMemoryMalloc(metaSize); if (NULL == *pDst) { return TSDB_CODE_TSC_OUT_OF_MEMORY;