提交 95fc626c 编写于 作者: wmmhello's avatar wmmhello

TD-6129<feature> add json tag support

上级 da48c584
......@@ -368,6 +368,8 @@ void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId);
#ifdef __cplusplus
}
#endif
......
......@@ -32,7 +32,6 @@
#include "ttoken.h"
#include "tdataformat.h"
#include "cJSON.h"
enum {
TSDB_USE_SERVER_TS = 0,
......@@ -1083,61 +1082,14 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
// encode json tag string
if(spd.numOfBound == 1 && pTagSchema[spd.boundedColumns[0]].type == TSDB_DATA_TYPE_JSON){
const char* msg1 = "json parse error";
char tmp = sToken.z[sToken.n];
sToken.z[sToken.n] = 0;
cJSON *root = cJSON_Parse(sToken.z);
if (root == NULL){
tscError("json parse error");
return false;
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root) || size == 0){
tscError("json error invalide value");
}
int jsonIndex = 0;
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
continue;
}
char tagVal[TSDB_MAX_TAGS_LEN];
int32_t output = 0;
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "serizelize json error", NULL);
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
if(item->type == cJSON_String){
output = 0;
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "serizelize json error", NULL);
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
}else if(item->type == cJSON_Number){
*((double *)tagVal) = item->valuedouble;
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal);
}else{
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalidate json value", NULL);
}
code = parseJsontoTagData(sToken.z, &kvRowBuilder, pInsertParam->msg, pTagSchema[spd.boundedColumns[0]].colId);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return code;
}
cJSON_Delete(root);
sToken.z[sToken.n] = tmp;
}
......
......@@ -905,6 +905,8 @@ static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MU
}
varDataSetLen(data + param->offset, output);
} else if (param->type == TSDB_DATA_TYPE_JSON) { // todo json
}
}
......
......@@ -1438,7 +1438,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
int32_t nLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pField = taosArrayGet(pFieldList, i);
if (!isValidDataType(pField->type)) {
if (!isValidDataType(pField->type) || pField->type == TSDB_DATA_TYPE_JSON) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
}
......@@ -1531,7 +1531,7 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
if (!isValidDataType(p->type) && p->type != TSDB_DATA_TYPE_JSON) {
if (!isValidDataType(p->type)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
......@@ -7740,8 +7740,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow";
const char* msg5 = "tags number not matched";
const char* msg6 = "tags json invalidate";
const char* msg7 = "serizelize json error";
SSqlCmd* pCmd = &pSql->cmd;
......@@ -7939,57 +7937,11 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
tVariantListItem* pItem = taosArrayGet(pValList, 0);
cJSON *root = cJSON_Parse(pItem->pVar.pz);
if (root == NULL){
tscError("json parse error");
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root) || size == 0){
tscError("json error invalide value");
ret = parseJsontoTagData(pItem->pVar.pz, &kvRowBuilder, tscGetErrorMsgPayload(pCmd), pTagSchema[0].colId);
if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
int jsonIndex = 0;
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
continue;
}
char tagVal[TSDB_MAX_TAGS_LEN];
int32_t output = 0;
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
if(item->type == cJSON_String){
output = 0;
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
}else if(item->type == cJSON_Number){
*((double *)tagVal) = item->valuedouble;
tdAddColToKVRow(&kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal);
}else{
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);}
return ret;
}
cJSON_Delete(root);
}
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
......
......@@ -30,6 +30,7 @@
#include "ttimer.h"
#include "ttokendef.h"
#include "httpInt.h"
#include "cJSON.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
......@@ -5150,3 +5151,64 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
return p;
}
int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, int16_t startColId){
cJSON *root = cJSON_Parse(json);
if (root == NULL){
tscError("json parse error");
return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL);
}
int retCode = 0;
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root) || size == 0){
tscError("json error invalide value");
retCode = tscSQLSyntaxErrMsg(errMsg, "json error invalide value", NULL);
goto end;
}
int jsonIndex = startColId++;
for(int i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) {
tscError("json inner error:%d", i);
retCode = tscSQLSyntaxErrMsg(errMsg, "json inner error", NULL);
goto end;
}
char tagVal[TSDB_MAX_TAGS_LEN];
int32_t output = 0;
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
memset(tagVal, 0, TSDB_MAX_TAGS_LEN);
if(item->type == cJSON_String){
output = 0;
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
}else if(item->type == cJSON_Number){
*((double *)tagVal) = item->valuedouble;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal);
}else{
retCode = tscSQLSyntaxErrMsg(errMsg, "invalidate json value", NULL);
goto end;
}
}
end:
cJSON_Delete(root);
return retCode;
}
......@@ -193,7 +193,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
return false;
}
// 1. valid types
if (!isValidDataType(pSchema[i].type) && pSchema[i].type != TSDB_DATA_TYPE_JSON) {
if (!isValidDataType(pSchema[i].type)) {
return false;
}
......
......@@ -430,7 +430,7 @@ FORCE_INLINE void* getDataMax(int32_t type) {
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_JSON;
}
void setVardataNull(void* val, int32_t type) {
......
......@@ -28,6 +28,7 @@ typedef struct STable {
STSchema* tagSchema;
SKVRow tagVal;
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
SHashObj* jsonKeyMap; // For json tag key {"key":[t1, t2, t3]}
void* eventHandler; // TODO
void* streamHandler; // TODO
TSKEY lastKey;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#include "tcompare.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5
#define DEFAULT_TAG_INDEX_COLUMN 0
......@@ -1051,7 +1052,40 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
pTable->pSuper = pSTable;
tSkipListPut(pSTable->pIndex, (void *)pTable);
if(pSTable->tagSchema->numOfCols == 1 && pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
int16_t nCols = kvRowNCols(pTable->tagVal);
ASSERT(nCols%2 == 1);
for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(pTable->tagVal, j);
void* val = (kvRowColVal(pTable->tagVal, pColIdx));
if (j == 0){ // json value is the first
int8_t jsonVal = *(int8_t*)val;
ASSERT(jsonVal == TSDB_DATA_BINARY_PLACEHOLDER);
continue;
}
void* tablist = taosHashGet(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val));
if(tablist == NULL) {
tablist = taosArrayInit(8, sizeof(uint64_t));
if(tablist == NULL){
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("out of memory when alloc json tag array");
return -1;
}
}
taosArrayPush(tablist, &TABLE_UID(pTable));
taosArraySort(tablist, compareUint64Val);
taosArrayRemoveDuplicate(tablist, compareUint64Val, NULL);
if(taosHashPut(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val), tablist, sizeof(void*)) < 0){
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("out of memory when put json tag array");
return -1;
}
}
}else{
tSkipListPut(pSTable->pIndex, (void *)pTable);
}
if (refSuper) T_REF_INC(pSTable);
return 0;
......@@ -1316,12 +1350,17 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
buf = tdDecodeSchema(buf, &(pTable->tagSchema));
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
if(pCol->type == TSDB_DATA_TYPE_JSON){
assert(pTable->tagSchema->numOfCols == 1);
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
}else{
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable);
return NULL;
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable);
return NULL;
}
}
}
......
......@@ -88,6 +88,8 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
*/
void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
/**
* return the size of hash table
* @param pHashObj
......
......@@ -203,7 +203,13 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp) {
if (pHashObj != NULL && fp != NULL) {
pHashObj->equalFp = fp;
}
}
}
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp) {
if (pHashObj != NULL && fp != NULL) {
pHashObj->freeFp = fp;
}
}
int32_t taosHashGetSize(const SHashObj *pHashObj) {
if (!pHashObj) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册