提交 187a79d3 编写于 作者: wmmhello's avatar wmmhello

TD-6129<feature> add tag-> select logic

上级 7a09aeb8
...@@ -28,6 +28,12 @@ extern "C" { ...@@ -28,6 +28,12 @@ extern "C" {
#include "tscGlobalmerge.h" #include "tscGlobalmerge.h"
#include "tsched.h" #include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
#include "tglobal.h"
#define JSON_TYPE_BINARY (strcasecmp(tsDefaultJSONStrType, "binary") == 0)
#define JSON_TYPE_NCHAR (strcasecmp(tsDefaultJSONStrType, "nchar") == 0)
#define SELECT_ALL_JSON_TAG 1
#define SELECT_ELEMENT_JSON_TAG 2
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ #define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
...@@ -376,6 +382,8 @@ char* cloneCurrentDBName(SSqlObj* pSql); ...@@ -376,6 +382,8 @@ char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId); int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
char* parseTagDatatoJson(void *p); char* parseTagDatatoJson(void *p);
void findTagValue(void* data, char* key, int32_t keyLen, char* out);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -729,11 +729,15 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo ...@@ -729,11 +729,15 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
} else { } else {
char* json = parseTagDatatoJson(p); if (*p == SELECT_ALL_JSON_TAG){
if(json){ char* json = parseTagDatatoJson(p+1);
memcpy(varDataVal(dst), json, strlen(json)); if(json) {
varDataSetLen(dst, strlen(json)); memcpy(varDataVal(dst), json, strlen(json));
tfree(json); varDataSetLen(dst, strlen(json));
tfree(json);
}
}else if (*p == SELECT_ELEMENT_JSON_TAG){
memcpy(dst, p+1, varDataTLen(p+1));
}else{ }else{
tscError("construct json error"); tscError("construct json error");
} }
...@@ -5174,9 +5178,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) { ...@@ -5174,9 +5178,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p; return p;
} }
pExprInfo[j].base.param[0].nLen void findTagValue(void* data, char* key, int32_t keyLen, char* out){
char* findTagValue(void* data, char* key, int32_t keyLen){
int16_t nCols = kvRowNCols(data); int16_t nCols = kvRowNCols(data);
bool found = false; bool found = false;
...@@ -5186,14 +5188,19 @@ char* findTagValue(void* data, char* key, int32_t keyLen){ ...@@ -5186,14 +5188,19 @@ char* findTagValue(void* data, char* key, int32_t keyLen){
if (k % 2 != 0) { // json key if (k % 2 != 0) { // json key
char tagJsonKey[TSDB_MAX_TAGS_LEN] = {0}; char tagJsonKey[TSDB_MAX_TAGS_LEN] = {0};
int32_t length = taosUcs4ToMbs(varDataVal(result), varDataLen(result), tagJsonKey); if (JSON_TYPE_BINARY){
if (length == 0) { if (keyLen != varDataLen(result)) continue;
tscError("charset:%s to %s. val:%s convert json key failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, if (memcmp(varDataVal(result), key, keyLen) != 0) continue;
(char*)result); } else if(JSON_TYPE_NCHAR){
continue; int32_t length = taosUcs4ToMbs(varDataVal(result), varDataLen(result), tagJsonKey);
} if (length == 0) {
if (strncmp(key, tagJsonKey, keyLen) != 0) { tscError("charset:%s to %s. val:%s convert json key failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
continue; (char*)result);
continue;
}
if (strncmp(key, tagJsonKey, keyLen) != 0) {
continue;
}
} }
found = true; found = true;
} else { // json value } else { // json value
...@@ -5206,12 +5213,18 @@ char* findTagValue(void* data, char* key, int32_t keyLen){ ...@@ -5206,12 +5213,18 @@ char* findTagValue(void* data, char* key, int32_t keyLen){
if (length == 0) { if (length == 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)result); (char*)result);
}else{
varDataSetLen(out, length);
memcpy(varDataVal(out), tagJsonValue, length);
} }
} else if (*(char*)result == cJSON_Number) { } else if (*(char*)result == cJSON_Number) {
double jsonVd = *(double*)(POINTER_SHIFT(result, CHAR_BYTES)); double jsonVd = *(double*)(POINTER_SHIFT(result, CHAR_BYTES));
sprintf(varDataVal(out), "%.9lf", jsonVd);
varDataSetLen(out, strlen(varDataVal(out)));
} else { } else {
tscError("unsupportted json value"); tscError("unsupportted json value");
} }
break;
} }
} }
} }
...@@ -5237,18 +5250,28 @@ char* parseTagDatatoJson(void *p){ ...@@ -5237,18 +5250,28 @@ char* parseTagDatatoJson(void *p){
} }
if (j%2 != 0) { // json key if (j%2 != 0) { // json key
memset(tagJsonKey, 0, TSDB_MAX_TAGS_LEN); memset(tagJsonKey, 0, TSDB_MAX_TAGS_LEN);
int32_t length = taosUcs4ToMbs(varDataVal(val), varDataLen(val), tagJsonKey); if (JSON_TYPE_BINARY){
if (length == 0) { strncpy(tagJsonKey, varDataVal(val), varDataLen(val));
tscError("charset:%s to %s. val:%s convert json key failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)val); } else if(JSON_TYPE_NCHAR){
goto end; int32_t length = taosUcs4ToMbs(varDataVal(val), varDataLen(val), tagJsonKey);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert json key failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)val);
goto end;
}
} }
}else{ // json value }else{ // json value
char tagJsonValue[TSDB_MAX_TAGS_LEN] = {0}; char tagJsonValue[TSDB_MAX_TAGS_LEN] = {0};
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
if(*(char*)val == cJSON_String){ if(*(char*)val == cJSON_String){
int32_t length = taosUcs4ToMbs(varDataVal(POINTER_SHIFT(val,CHAR_BYTES)), varDataLen(POINTER_SHIFT(val,CHAR_BYTES)), tagJsonValue); if (JSON_TYPE_BINARY){
if (length == 0) { strncpy(tagJsonValue, varDataVal(realData), varDataLen(realData), varDataLen(realData));
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)val); } else if(JSON_TYPE_NCHAR) {
goto end; int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)val);
goto end;
}
} }
cJSON* value = cJSON_CreateString(tagJsonValue); cJSON* value = cJSON_CreateString(tagJsonValue);
if (value == NULL) if (value == NULL)
...@@ -5257,7 +5280,7 @@ char* parseTagDatatoJson(void *p){ ...@@ -5257,7 +5280,7 @@ char* parseTagDatatoJson(void *p){
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(*(char*)val == cJSON_Number){ }else if(*(char*)val == cJSON_Number){
double jsonVd = *(double*)(POINTER_SHIFT(val,CHAR_BYTES)); double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd); cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) if (value == NULL)
{ {
...@@ -5299,28 +5322,39 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in ...@@ -5299,28 +5322,39 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in
goto end; goto end;
} }
char tagVal[TSDB_MAX_TAGS_LEN] = {0}; char tagVal[TSDB_MAX_TAGS_LEN] = {0};
int32_t output = 0; int32_t outLen = 0;
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) { if (JSON_TYPE_BINARY){
tscError("json string error:%s|%s", strerror(errno), item->string); strncpy(tagVal, item->string, strlen(item->string));
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL); outLen = strlen(item->string);
goto end; }else if(JSON_TYPE_NCHAR){
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &outLen)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
} }
varDataSetLen(tagVal, output); varDataSetLen(tagVal, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, false); // add json key tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, false); // add json key
memset(tagVal, 0, TSDB_MAX_TAGS_LEN); memset(tagVal, 0, TSDB_MAX_TAGS_LEN);
if(item->type == cJSON_String){ // add json value format: type|data if(item->type == cJSON_String){ // add json value format: type|data
output = 0; outLen = 0;
*tagVal = item->type; // type *tagVal = item->type; // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES); char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagData), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) { if (JSON_TYPE_BINARY){
tscError("json string error:%s|%s", strerror(errno), item->string); strncpy(tagVal, item->valuestring, strlen(item->valuestring));
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL); outLen = strlen(item->valuestring);
goto end; }else if(JSON_TYPE_NCHAR) {
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagData),
TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
} }
varDataSetLen(tagData, output); varDataSetLen(tagData, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true); tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true);
}else if(item->type == cJSON_Number){ }else if(item->type == cJSON_Number){
*tagVal = item->type; // type *tagVal = item->type; // type
......
...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) ...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(common ${SRC}) ADD_LIBRARY(common ${SRC})
......
...@@ -283,7 +283,7 @@ char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESS ...@@ -283,7 +283,7 @@ char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESS
int8_t tsDeadLockKillQuery = 0; int8_t tsDeadLockKillQuery = 0;
// default JSON string type // default JSON string type
char tsDefaultJSONStrType[7] = "binary"; char tsDefaultJSONStrType[7] = "nchar";
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL; void (*monStopSystemFp)() = NULL;
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "ttype.h" #include "ttype.h"
#include "tutil.h" #include "tutil.h"
#include "tvariant.h" #include "tvariant.h"
#include "tscUtil.h"
#define SET_EXT_INFO(converted, res, minv, maxv, exti) do { \ #define SET_EXT_INFO(converted, res, minv, maxv, exti) do { \
if (converted == NULL || exti == NULL || *converted == false) { break; } \ if (converted == NULL || exti == NULL || *converted == false) { break; } \
...@@ -105,6 +106,10 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) { ...@@ -105,6 +106,10 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
* @param type * @param type
*/ */
void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type) { void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type) {
if(type == TSDB_DATA_TYPE_JSON){
if(JSON_TYPE_BINARY) type = TSDB_DATA_TYPE_BINARY;
else if(JSON_TYPE_NCHAR) type = TSDB_DATA_TYPE_NCHAR;
}
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
...@@ -167,8 +172,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32 ...@@ -167,8 +172,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break; break;
} }
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:{
case TSDB_DATA_TYPE_JSON:{ // todo refactor, extract a method
pVar->pz = calloc(len + 1, sizeof(char)); pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len); memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len; pVar->nLen = (int32_t)len;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "tscLog.h" #include "tscLog.h"
#include "cJSON.h" #include "cJSON.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
#include "tscUtil.h"
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
...@@ -7174,24 +7175,30 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { ...@@ -7174,24 +7175,30 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
type = pExprInfo[j].base.resType; type = pExprInfo[j].base.resType;
bytes = pExprInfo[j].base.resBytes; bytes = pExprInfo[j].base.resBytes;
char* tagJsonElementData = NULL;
dst = pColInfo->pData + count * pExprInfo[j].base.resBytes;
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable); data = tsdbGetTableName(item->pTable);
} else { } else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
if(pExprInfo[j].base.numOfParams > 0){ // tag-> operation if(type == TSDB_DATA_TYPE_JSON){
STSchema *pSchema = tsdbGetTableTagSchema((STable*) item->pTable); if(pExprInfo[j].base.numOfParams > 0){ // tag-> operation
STColumn *pCol = tdGetColOfID(pSchema, pExprInfo[j].base.colInfo.colId); tagJsonElementData = calloc(bytes, 1);
if (pCol == NULL) { findTagValue(data, pExprInfo[j].base.param[0].pz, pExprInfo[j].base.param[0].nLen, tagJsonElementData);
continue; // No matched tag volumn *dst = SELECT_ELEMENT_JSON_TAG; // select tag->element
} dst++;
if(pCol->type == TSDB_DATA_TYPE_JSON){ }else{
*dst = SELECT_ALL_JSON_TAG; // select tag
dst++;
} }
} }
} }
if(tagJsonElementData != NULL){
dst = pColInfo->pData + count * pExprInfo[j].base.resBytes; doSetTagValueToResultBuf(dst, tagJsonElementData, type, bytes);
doSetTagValueToResultBuf(dst, data, type, bytes); tfree(tagJsonElementData);
}else{
doSetTagValueToResultBuf(dst, data, type, bytes);
}
} }
count += 1; count += 1;
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "ttoken.h" #include "ttoken.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tutil.h" #include "tutil.h"
#include "tscUtil.h"
SSqlInfo qSqlParse(const char *pStr) { SSqlInfo qSqlParse(const char *pStr) {
void *pParser = ParseAlloc(malloc); void *pParser = ParseAlloc(malloc);
...@@ -714,7 +715,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -714,7 +715,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
pField->type = i; pField->type = i;
pField->bytes = tDataTypes[i].bytes; pField->bytes = tDataTypes[i].bytes;
if (i == TSDB_DATA_TYPE_NCHAR) { if (i == TSDB_DATA_TYPE_NCHAR || (i == TSDB_DATA_TYPE_JSON && JSON_TYPE_NCHAR) {
/* /*
* for nchar, the TOKENTYPE is the number of character, so the length is the * for nchar, the TOKENTYPE is the number of character, so the length is the
* number of bytes in UCS-4 format, which is 4 times larger than the number of characters * number of bytes in UCS-4 format, which is 4 times larger than the number of characters
...@@ -731,7 +732,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -731,7 +732,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
} }
pField->bytes = (int16_t)bytes; pField->bytes = (int16_t)bytes;
} }
} else if (i == TSDB_DATA_TYPE_BINARY) { } else if (i == TSDB_DATA_TYPE_BINARY || (i == TSDB_DATA_TYPE_JSON && JSON_TYPE_BINARY) {
/* for binary, the TOKENTYPE is the length of binary */ /* for binary, the TOKENTYPE is the length of binary */
if (type->type == 0) { if (type->type == 0) {
pField->bytes = 0; pField->bytes = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册