diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 2e1b480fb15eb72a7b6d228a7d72737b65f61156..62f754806f719fd07f2cbbe462b116da866c9b07 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2643,12 +2643,37 @@ int tscProcessShowCreateRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 1); } +static void updateFieldForJson(SSqlObj *pSql, SQueryTableRsp *pQueryAttr){ + if(pQueryAttr->tJsonSchLen <= 0) { + return; + } + + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); + SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo; + for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) { + SInternalField *pField = tscFieldInfoGetInternalField(pFieldInfo, i); + + if (pField->field.type == TSDB_DATA_TYPE_JSON) { + for (int k = 0; k < pQueryAttr->tJsonSchLen; ++k) { + if (strncmp(pField->field.name, pQueryAttr->tagJsonSchema[k].name, TSDB_MAX_JSON_KEY_LEN) == 0) { + pField->field.type = pQueryAttr->tagJsonSchema[k].type; + pField->field.bytes = TYPE_BYTES[pField->field.type]; + tscDebug("0x%" PRIx64 " change json type %s:%s to %d", pSql->self, pField->field.name, pQueryAttr->tagJsonSchema[k].name, + pField->field.type); + break; + } + } + } + } +} + int tscProcessQueryRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp; pQueryAttr->qId = htobe64(pQueryAttr->qId); - + pQueryAttr->tJsonSchLen = htons(pQueryAttr->tJsonSchLen); + updateFieldForJson(pSql, pQueryAttr); pRes->qId = pQueryAttr->qId; pRes->data = NULL; diff --git a/src/inc/query.h b/src/inc/query.h index 0872e3dbaa517ded77dd758b30e69f273c13a580..67042764fefb74008be7cbcc387e8d89d542263b 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -28,7 +28,7 @@ typedef void* qinfo_t; * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId, void** tJsonSchema); /** diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c6d587fe1a296bc40ab804cdef160b70da273fd8..fa57982c3b95e7a80bf5a790720d6e4ea3f93f01 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -258,6 +258,11 @@ typedef struct SSchema { int16_t bytes; } SSchema; +typedef struct TagJsonSSchema { + uint8_t type; + char name[TSDB_MAX_JSON_KEY_LEN + 1]; +} TagJsonSSchema; + typedef struct { int32_t contLen; int32_t vgId; @@ -514,6 +519,8 @@ typedef struct { typedef struct { int32_t code; union{uint64_t qhandle; uint64_t qId;}; // query handle + uint16_t tJsonSchLen; + TagJsonSSchema tagJsonSchema[]; } SQueryTableRsp; // todo: the show handle should be replaced with id diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index a90780b202c6a88ab4f6ced3b5896f48ffc2538b..a4d73a84e1d33cff6cf83b29e80323418a0f31b8 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -419,6 +419,8 @@ int tsdbCompact(STsdbRepo *pRepo); bool tsdbNoProblem(STsdbRepo* pRepo); // unit of walSize: MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); +// for tag json +uint8_t getTagJsonType(STsdbRepo* tsdb, uint64_t uid, char* key, int32_t len); #ifdef __cplusplus } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index fce7f649892f87d075c8dd64e4d1160e5d05bf77..1ead8f60315af3e88199ddf4a49ca66824c4ac0b 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -67,7 +67,39 @@ void freeParam(SQueryParam *param) { tfree(param->prevResult); } -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId) { +static void* setJsonTagSchema(void* tsdb, int16_t numOfOutput, SExprInfo *pExprs, SArray* pTableIdList){ + uint16_t cnt = 0; + for (int i = 0; i < numOfOutput; ++i) { + SSqlExpr* sqlExpr = &pExprs[i].base; + if (sqlExpr->colType == TSDB_DATA_TYPE_JSON && sqlExpr->numOfParams > 0) { + cnt ++; + } + } + if(cnt <= 0) return NULL; + void* tJsonSchData = calloc(1, SHORT_BYTES + cnt*sizeof(TagJsonSSchema)); + *(uint16_t*)(tJsonSchData) = cnt; + void* tmp = tJsonSchData + SHORT_BYTES; + for (int i = 0; i < numOfOutput; ++i) { + SSqlExpr* sqlExpr = &pExprs[i].base; + if (sqlExpr->colType == TSDB_DATA_TYPE_JSON && sqlExpr->numOfParams > 0){ + TagJsonSSchema* schema = (TagJsonSSchema*)(tmp); + schema->type = TSDB_DATA_TYPE_NULL; + tstrncpy(schema->name, sqlExpr->param[0].pz, TSDB_MAX_JSON_KEY_LEN + 1); + for (int j = 0; j < taosArrayGetSize(pTableIdList); ++j) { + STableIdInfo *id = taosArrayGet(pTableIdList, j); + uint8_t type = getTagJsonType(tsdb, id->uid, sqlExpr->param[0].pz, sqlExpr->param[0].nLen); + if(type != TSDB_DATA_TYPE_NULL) { + schema->type = type; + break; + } + } + tmp += sizeof(TagJsonSSchema); + } + } + return tJsonSchData; +} + +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId, void** tJsonSchema) { assert(pQueryMsg != NULL && tsdb != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -168,7 +200,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi assert(pQueryMsg->stableQuery == isSTableQuery); (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo); - + *tJsonSchema = setJsonTagSchema(tsdb, pQueryMsg->numOfOutput, param.pExprs, param.pTableIdList); param.sql = NULL; param.pExprs = NULL; param.pSecExprs = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 145da28e2e881f14fdfb37594eae3d5f68499bf0..e982530f90cc206c357d1e926785e78d3ddac916 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -4112,5 +4112,16 @@ static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo return TSDB_CODE_SUCCESS; } +uint8_t getTagJsonType(STsdbRepo* tsdb, uint64_t uid, char* key, int32_t len){ + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); + void* result = getJsonTagValueElment(pTable, key, len, NULL, TSDB_MAX_JSON_TAGS_LEN); + if(result){ + return *(char*)result; + }else{ + return TSDB_DATA_TYPE_NULL; + } +} + + diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index e8495cac6d7de10018757876fba3674bda0e6231..f9d61c5b37579c04585c1bd921d9a218ce567cd1 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -239,14 +239,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (contLen != 0) { qinfo_t pQInfo = NULL; uint64_t qId = genQueryId(); - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId); + void* tJsonSchema = NULL; + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &tJsonSchema); - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); + int extSize = 0; + if (tJsonSchema != NULL){ + uint16_t cnt = *(uint16_t*)tJsonSchema; + extSize = cnt * sizeof(TagJsonSSchema); + } + SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp) + extSize); pRsp->code = code; pRsp->qId = 0; - pRet->len = sizeof(SQueryTableRsp); + pRet->len = sizeof(SQueryTableRsp) + extSize; pRet->rsp = pRsp; + if (tJsonSchema != NULL){ + pRsp->tJsonSchLen = htons(*(uint16_t*)tJsonSchema); + memcpy(pRsp->tagJsonSchema, tJsonSchema + SHORT_BYTES, extSize); + tfree(tJsonSchema); + } int32_t vgId = pVnode->vgId; // current connect is broken