提交 c7ea4736 编写于 作者: wmmhello's avatar wmmhello

TD-6129<feature> add tag-> select logic

上级 f4efb6b1
......@@ -377,7 +377,7 @@ char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
char* parseTagDatatoJson(void *p);
void findTagValue(void* data, char* key, int32_t keyLen, char* out);
void findTagValue(void* data, char* key, int32_t keyLen, char* out, int16_t len);
#ifdef __cplusplus
}
......
......@@ -388,7 +388,7 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
break;
case TSDB_DATA_TYPE_JSON:
if (pToken->n > TSDB_MAX_TAGS_LEN) {
if (pToken->n >= pSchema->bytes) { // reserve 1 byte for select
return tscInvalidOperationMsg(msg, "json tag length too long", pToken->z);
}
if (pToken->type == TK_NULL) {
......@@ -1111,7 +1111,14 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (pInsertParam->tagData.dataLen <= 0){
return tscSQLSyntaxErrMsg(pInsertParam->msg, "tag value expected", NULL);
}
// encode json tag string
if(spd.numOfBound == 1 && pTagSchema[spd.boundedColumns[0]].type == TSDB_DATA_TYPE_JSON){
if(kvRowLen(row) >= pTagSchema[spd.boundedColumns[0]].bytes){ // reserve 1 byte for select
char tmp[128]= {0};
sprintf(tmp, "tag value is too small, can not contain encoded json tag:%d|%d", kvRowLen(row), pTagSchema[spd.boundedColumns[0]].bytes);
return tscSQLSyntaxErrMsg(pInsertParam->msg, tmp, NULL);
}
}
char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen);
if (pTag == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -6131,6 +6131,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tdSortKVRowByColIdx(row);
if(kvRowLen(row) >= pTagsSchema->bytes){ // reserve 1 byte for select
char tmp[128]= {0};
sprintf(tmp, "tag value is too small, can not contain encoded json tag:%d|%d", kvRowLen(row), pTagsSchema->bytes);
return invalidOperationMsg(pMsg, tmp);
}
kvRowCpy(pUpdateMsg->data + schemaLen, row);
free(row);
}else{
......@@ -7707,7 +7713,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
pItem->pVar.i64 = convertTimePrecision(pItem->pVar.i64, TSDB_TIME_PRECISION_NANO, tinfo.precision);
}
} else if (pSchema->type == TSDB_DATA_TYPE_JSON) {
if (pItem->pVar.nLen > TSDB_MAX_TAGS_LEN) {
if (pItem->pVar.nLen > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -7759,6 +7765,14 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
}
tdSortKVRowByColIdx(row);
pTag->dataLen = kvRowLen(row);
if(schemaSize == 1 && pTagSchema[0].type == TSDB_DATA_TYPE_JSON){
if(kvRowLen(row) >= pTagSchema[0].bytes){ // reserve 1 byte for select
char tmp[128]= {0};
sprintf(tmp, "tag value is too small, can not contain encoded json tag:%d|%d", kvRowLen(row), pTagSchema[0].bytes);
free(row);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), tmp);
}
}
if (pTag->data == NULL) {
pTag->data = malloc(pTag->dataLen);
......
......@@ -5178,7 +5178,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p;
}
void findTagValue(void* data, char* key, int32_t keyLen, char* out){
void findTagValue(void* data, char* key, int32_t keyLen, char* out, int16_t len){
int16_t nCols = kvRowNCols(data);
bool found = false;
......@@ -5205,21 +5205,30 @@ void findTagValue(void* data, char* key, int32_t keyLen, char* out){
found = true;
} else { // json value
if (!found) continue;
char* realData = POINTER_SHIFT(result, CHAR_BYTES);
if (*(char*)result == cJSON_String) {
char tagJsonValue[TSDB_MAX_TAGS_LEN] = {0};
int32_t length = taosUcs4ToMbs(varDataVal(POINTER_SHIFT(result, CHAR_BYTES)),
varDataLen(POINTER_SHIFT(result, CHAR_BYTES)), tagJsonValue);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)result);
}else{
varDataSetLen(out, length);
memcpy(varDataVal(out), tagJsonValue, length);
if (JSON_TYPE_BINARY){
assert(varDataLen(realData) <= len);
memcpy(varDataVal(out), varDataVal(realData), varDataLen(realData));
varDataSetLen(out, varDataLen(realData));
} else if(JSON_TYPE_NCHAR) {
char tagJsonValue[TSDB_MAX_TAGS_LEN] = {0};
int32_t length = taosUcs4ToMbs(varDataVal(realData),
varDataLen(realData), tagJsonValue);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)result);
} else {
assert(length <= len);
varDataSetLen(out, length);
memcpy(varDataVal(out), tagJsonValue, length);
}
}
} else if (*(char*)result == cJSON_Number) {
double jsonVd = *(double*)(POINTER_SHIFT(result, CHAR_BYTES));
double jsonVd = *(double*)(realData);
sprintf(varDataVal(out), "%.9lf", jsonVd);
assert(strlen(varDataVal(out)) <= len);
varDataSetLen(out, strlen(varDataVal(out)));
} else {
tscError("unsupportted json value");
......
......@@ -369,7 +369,7 @@ static void getStatics_nchr(const void *pData, int32_t numOfRow, int64_t *min, i
}
tDataTypeDescriptor tDataTypes[16] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
......@@ -377,14 +377,14 @@ tDataTypeDescriptor tDataTypes[16] = {
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_u64},
{TSDB_DATA_TYPE_JSON, 4,TSDB_MAX_NCHAR_LEN, "JSON", INT8_MIN, TSDB_MAX_NCHAR_LEN, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_JSON, 4, 0, "JSON", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
};
char tTokenTypeSwitcher[13] = {
......
......@@ -3344,7 +3344,7 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag,
val = tsdbGetTableName(pTable);
assert(val != NULL);
} else {
val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); // todo json
val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
}
if (val == NULL || isNull(val, type)) {
......@@ -3357,7 +3357,13 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag,
int32_t len = (varDataLen(val) > maxLen)? maxLen:varDataLen(val);
tVariantCreateFromBinary(tag, varDataVal(val), len, type);
//tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
} else {
} else if(type == TSDB_DATA_TYPE_JSON){
assert(kvRowLen(val) < bytes);
tVariantCreateFromBinary(tag, val, bytes, type);
memcpy(tag->pz + 1, tag->pz, bytes - 1); // move back 1 byte for select type
*(tag->pz) = SELECT_ALL_JSON_TAG;
}
else {
tVariantCreateFromBinary(tag, val, bytes, type);
}
}
......@@ -7139,7 +7145,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); //todo json
}
doSetTagValueToResultBuf(output, data, type, bytes);
......@@ -7179,26 +7185,28 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
dst = pColInfo->pData + count * pExprInfo[j].base.resBytes;
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
if(type == TSDB_DATA_TYPE_JSON){
if(pExprInfo[j].base.numOfParams > 0){ // tag-> operation
tagJsonElementData = calloc(bytes, 1);
findTagValue(data, pExprInfo[j].base.param[0].pz, pExprInfo[j].base.param[0].nLen, tagJsonElementData);
findTagValue(data, pExprInfo[j].base.param[0].pz, pExprInfo[j].base.param[0].nLen, tagJsonElementData, bytes);
*dst = SELECT_ELEMENT_JSON_TAG; // select tag->element
dst++;
assert(varDataTLen(tagJsonElementData) < bytes);
doSetTagValueToResultBuf(dst, tagJsonElementData, type, bytes - 1);
tfree(tagJsonElementData);
}else{
*dst = SELECT_ALL_JSON_TAG; // select tag
dst++;
assert(kvRowLen(data) < bytes);
doSetTagValueToResultBuf(dst, data, type, bytes - 1);
}
continue;
}
}
if(tagJsonElementData != NULL){
doSetTagValueToResultBuf(dst, tagJsonElementData, type, bytes);
tfree(tagJsonElementData);
}else{
doSetTagValueToResultBuf(dst, data, type, bytes);
}
doSetTagValueToResultBuf(dst, data, type, bytes);
}
count += 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册