提交 a9afde8f 编写于 作者: wmmhello's avatar wmmhello

change json type with qid return logic

上级 85a0ec99
...@@ -2643,12 +2643,37 @@ int tscProcessShowCreateRsp(SSqlObj *pSql) { ...@@ -2643,12 +2643,37 @@ int tscProcessShowCreateRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, 1); return tscLocalResultCommonBuilder(pSql, 1);
} }
static void updateFieldForJson(SSqlObj *pSql, SQueryTableRsp *pQueryAttr){
if(pQueryAttr->tJsonSchLen <= 0) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo;
for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) {
SInternalField *pField = tscFieldInfoGetInternalField(pFieldInfo, i);
if (pField->field.type == TSDB_DATA_TYPE_JSON) {
for (int k = 0; k < pQueryAttr->tJsonSchLen; ++k) {
if (strncmp(pField->field.name, pQueryAttr->tagJsonSchema[k].name, TSDB_MAX_JSON_KEY_LEN) == 0) {
pField->field.type = pQueryAttr->tagJsonSchema[k].type;
pField->field.bytes = TYPE_BYTES[pField->field.type];
tscDebug("0x%" PRIx64 " change json type %s:%s to %d", pSql->self, pField->field.name, pQueryAttr->tagJsonSchema[k].name,
pField->field.type);
break;
}
}
}
}
}
int tscProcessQueryRsp(SSqlObj *pSql) { int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp; SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp;
pQueryAttr->qId = htobe64(pQueryAttr->qId); pQueryAttr->qId = htobe64(pQueryAttr->qId);
pQueryAttr->tJsonSchLen = htons(pQueryAttr->tJsonSchLen);
updateFieldForJson(pSql, pQueryAttr);
pRes->qId = pQueryAttr->qId; pRes->qId = pQueryAttr->qId;
pRes->data = NULL; pRes->data = NULL;
......
...@@ -28,7 +28,7 @@ typedef void* qinfo_t; ...@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId, void** tJsonSchema);
/** /**
......
...@@ -258,6 +258,11 @@ typedef struct SSchema { ...@@ -258,6 +258,11 @@ typedef struct SSchema {
int16_t bytes; int16_t bytes;
} SSchema; } SSchema;
typedef struct TagJsonSSchema {
uint8_t type;
char name[TSDB_MAX_JSON_KEY_LEN + 1];
} TagJsonSSchema;
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
...@@ -514,6 +519,8 @@ typedef struct { ...@@ -514,6 +519,8 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
uint16_t tJsonSchLen;
TagJsonSSchema tagJsonSchema[];
} SQueryTableRsp; } SQueryTableRsp;
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
......
...@@ -419,6 +419,8 @@ int tsdbCompact(STsdbRepo *pRepo); ...@@ -419,6 +419,8 @@ int tsdbCompact(STsdbRepo *pRepo);
bool tsdbNoProblem(STsdbRepo* pRepo); bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB // unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
// for tag json
uint8_t getTagJsonType(STsdbRepo* tsdb, uint64_t uid, char* key, int32_t len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -67,7 +67,39 @@ void freeParam(SQueryParam *param) { ...@@ -67,7 +67,39 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId) { static void* setJsonTagSchema(void* tsdb, int16_t numOfOutput, SExprInfo *pExprs, SArray* pTableIdList){
uint16_t cnt = 0;
for (int i = 0; i < numOfOutput; ++i) {
SSqlExpr* sqlExpr = &pExprs[i].base;
if (sqlExpr->colType == TSDB_DATA_TYPE_JSON && sqlExpr->numOfParams > 0) {
cnt ++;
}
}
if(cnt <= 0) return NULL;
void* tJsonSchData = calloc(1, SHORT_BYTES + cnt*sizeof(TagJsonSSchema));
*(uint16_t*)(tJsonSchData) = cnt;
void* tmp = tJsonSchData + SHORT_BYTES;
for (int i = 0; i < numOfOutput; ++i) {
SSqlExpr* sqlExpr = &pExprs[i].base;
if (sqlExpr->colType == TSDB_DATA_TYPE_JSON && sqlExpr->numOfParams > 0){
TagJsonSSchema* schema = (TagJsonSSchema*)(tmp);
schema->type = TSDB_DATA_TYPE_NULL;
tstrncpy(schema->name, sqlExpr->param[0].pz, TSDB_MAX_JSON_KEY_LEN + 1);
for (int j = 0; j < taosArrayGetSize(pTableIdList); ++j) {
STableIdInfo *id = taosArrayGet(pTableIdList, j);
uint8_t type = getTagJsonType(tsdb, id->uid, sqlExpr->param[0].pz, sqlExpr->param[0].nLen);
if(type != TSDB_DATA_TYPE_NULL) {
schema->type = type;
break;
}
}
tmp += sizeof(TagJsonSSchema);
}
}
return tJsonSchData;
}
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId, void** tJsonSchema) {
assert(pQueryMsg != NULL && tsdb != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -168,7 +200,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -168,7 +200,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(pQueryMsg->stableQuery == isSTableQuery); assert(pQueryMsg->stableQuery == isSTableQuery);
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo); param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo);
*tJsonSchema = setJsonTagSchema(tsdb, pQueryMsg->numOfOutput, param.pExprs, param.pTableIdList);
param.sql = NULL; param.sql = NULL;
param.pExprs = NULL; param.pExprs = NULL;
param.pSecExprs = NULL; param.pSecExprs = NULL;
......
...@@ -4112,5 +4112,16 @@ static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo ...@@ -4112,5 +4112,16 @@ static int32_t tsdbQueryTableList(STable* pTable, SArray* pRes, void* filterInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
uint8_t getTagJsonType(STsdbRepo* tsdb, uint64_t uid, char* key, int32_t len){
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
void* result = getJsonTagValueElment(pTable, key, len, NULL, TSDB_MAX_JSON_TAGS_LEN);
if(result){
return *(char*)result;
}else{
return TSDB_DATA_TYPE_NULL;
}
}
...@@ -239,14 +239,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -239,14 +239,25 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
uint64_t qId = genQueryId(); uint64_t qId = genQueryId();
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId); void* tJsonSchema = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &tJsonSchema);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); int extSize = 0;
if (tJsonSchema != NULL){
uint16_t cnt = *(uint16_t*)tJsonSchema;
extSize = cnt * sizeof(TagJsonSSchema);
}
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp) + extSize);
pRsp->code = code; pRsp->code = code;
pRsp->qId = 0; pRsp->qId = 0;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp) + extSize;
pRet->rsp = pRsp; pRet->rsp = pRsp;
if (tJsonSchema != NULL){
pRsp->tJsonSchLen = htons(*(uint16_t*)tJsonSchema);
memcpy(pRsp->tagJsonSchema, tJsonSchema + SHORT_BYTES, extSize);
tfree(tJsonSchema);
}
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
// current connect is broken // current connect is broken
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册