提交 54060691 编写于 作者: wmmhello's avatar wmmhello

TD-6129<feature> add json tag support

上级 95fc626c
......@@ -1583,8 +1583,6 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return false;
}
if (p->type == TSDB_DATA_TYPE_JSON && validataTagJson
}
return true;
......@@ -6297,17 +6295,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidOperationMsg(pMsg, msg14);
}
pAlterSQL->tagData.data = calloc(1, pTagsSchema->bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
if (tVariantDump(&pItem->pVar, pAlterSQL->tagData.data, pTagsSchema->type, true) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg13);
}
pAlterSQL->tagData.dataLen = pTagsSchema->bytes;
// validate the length of binary
if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) &&
varDataTLen(pAlterSQL->tagData.data) > pTagsSchema->bytes) {
if (pTagsSchema->type == TSDB_DATA_TYPE_JSON && (pItem->pVar.nLen > TSDB_MAX_TAGS_LEN)) {
return invalidOperationMsg(pMsg, msg14);
}
......@@ -6344,14 +6332,17 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
// copy the tag value to pMsg body
pItem = taosArrayGet(pVarList, 1);
tVariantDump(&pItem->pVar, pUpdateMsg->data + schemaLen, pTagsSchema->type, true);
if (tVariantDump(&pItem->pVar, pUpdateMsg->data + schemaLen, pTagsSchema->type, true)
!= TSDB_CODE_SUCCESS){
return invalidOperationMsg(pMsg, msg13);
}
int32_t len = 0;
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
if (!IS_VAR_DATA_TYPE(pTagsSchema->type)) {
len = tDataTypes[pTagsSchema->type].bytes;
} else {
len = varDataTLen(pUpdateMsg->data + schemaLen);
if(len > pTagsSchema->bytes) return invalidOperationMsg(pMsg, msg14);
}
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
......
......@@ -5176,7 +5176,7 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in
retCode = tscSQLSyntaxErrMsg(errMsg, "json inner error", NULL);
goto end;
}
char tagVal[TSDB_MAX_TAGS_LEN];
char tagVal[TSDB_MAX_TAGS_LEN] = {0};
int32_t output = 0;
if (!taosMbsToUcs4(item->string, strlen(item->string), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
......@@ -5185,22 +5185,25 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in
}
varDataSetLen(tagVal, output);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal); // add json key
memset(tagVal, 0, TSDB_MAX_TAGS_LEN);
if(item->type == cJSON_String){
if(item->type == cJSON_String){ // add json value format: type|data
output = 0;
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagVal), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
*tagVal = item->type; // type
char* tagData = tagVal + CHAR_BYTES;
if (!taosMbsToUcs4(item->valuestring, strlen(item->valuestring), varDataVal(tagData), TSDB_MAX_TAGS_LEN - VARSTR_HEADER_SIZE, &output)) {
tscError("json string error:%s|%s", strerror(errno), item->string);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
goto end;
}
varDataSetLen(tagVal, output);
varDataSetLen(tagData, output);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal);
}else if(item->type == cJSON_Number){
*((double *)tagVal) = item->valuedouble;
*tagVal = item->type; // type
char* tagData = tagVal + CHAR_BYTES;
*((double *)tagData) = item->valuedouble;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_BIGINT, tagVal);
}else{
retCode = tscSQLSyntaxErrMsg(errMsg, "invalidate json value", NULL);
......
......@@ -155,7 +155,6 @@ typedef struct SAlterTableInfo {
SStrToken name;
int16_t tableType;
int16_t type;
STagData tagData;
SArray *pAddColumns; // SArray<TAOS_FIELD>
SArray *varList; // set t=val or: change src dst, SArray<tVariantListItem>
} SAlterTableInfo;
......
......@@ -952,7 +952,6 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeVariant);
taosArrayDestroy(pInfo->pAlterInfo->pAddColumns);
tfree(pInfo->pAlterInfo->tagData.data);
tfree(pInfo->pAlterInfo);
} else if (pInfo->type == TSDB_SQL_COMPACT_VNODE) {
tSqlExprListDestroy(pInfo->list);
......
......@@ -18,6 +18,14 @@
#define TSDB_MAX_TABLE_SCHEMAS 16
#pragma pack (push,1)
typedef struct jsonMapValue{
uint64_t uid; // the unique table ID
int16_t colId; // the json col ID.
}JsonMapValue;
#pragma pack (pop)
typedef struct STable {
STableId tableId;
ETableType type;
......
......@@ -389,7 +389,8 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
TSDB_WUNLOCK_TABLE(pTable->pSuper);
}
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)))
|| pMsg->type == TSDB_DATA_TYPE_JSON;
// STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
// ASSERT(pCol != NULL);
......@@ -398,7 +399,12 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
TSDB_WLOCK_TABLE(pTable);
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
if (pMsg->type == TSDB_DATA_TYPE_JSON){
kvRowFree(pTable->tagVal);
pTable->tagVal = tdKVRowDup(POINTER_SHIFT(pMsg->data, pMsg->schemaLen))
}else{
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
}
TSDB_WUNLOCK_TABLE(pTable);
if (isChangeIndexCol) {
tsdbAddTableIntoIndex(pMeta, pTable, false);
......@@ -847,11 +853,21 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
}
pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
if(pCol->type == TSDB_DATA_TYPE_JSON){
assert(pTable->tagSchema->numOfCols == 1);
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pTable->jsonKeyMap == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable);
return NULL;
}
}else{
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
}
} else {
pTable->type = pCfg->type;
......@@ -904,6 +920,21 @@ _err:
return NULL;
}
static void* tsdbDestroyTagJsonHashTable(SHashObj* hashObj) {
if (hashObj == NULL) {
return NULL;
}
SArray * p = taosHashIterate(hashObj, NULL);
while(p) {
taosArrayDestroy(p);
p = taosHashIterate(hashObj, p);
}
taosHashCleanup(hashObj);
return NULL;
}
static void tsdbFreeTable(STable *pTable) {
if (pTable) {
if (pTable->name != NULL)
......@@ -921,6 +952,7 @@ static void tsdbFreeTable(STable *pTable) {
kvRowFree(pTable->tagVal);
tSkipListDestroy(pTable->pIndex);
tsdbDestroyTagJsonHashTable(pTable->jsonKeyMap);
taosTZfree(pTable->lastRow);
tfree(pTable->sql);
......@@ -1045,6 +1077,14 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
tsdbUnRefTable(pTable);
}
static int tscCompareJsonMapValue(const void* a, const void* b) {
const JsonMapValue* x = (const JsonMapValue*)a;
const JsonMapValue* y = (const JsonMapValue*)b;
if (x->uid > y->uid) return 1;
if (x->uid < y->uid) return -1;
return 0;
}
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper) {
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable));
......@@ -1052,10 +1092,12 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
pTable->pSuper = pSTable;
if(pSTable->tagSchema->numOfCols == 1 && pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
if(pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
ASSERT(pSTable->tagSchema->numOfCols == 1);
int16_t nCols = kvRowNCols(pTable->tagVal);
ASSERT(nCols%2 == 1);
for (int j = 0; j < nCols; ++j) {
if (j != 0 && j%2 == 0) continue; // jump value
SColIdx * pColIdx = kvRowColIdxAt(pTable->tagVal, j);
void* val = (kvRowColVal(pTable->tagVal, pColIdx));
if (j == 0){ // json value is the first
......@@ -1064,24 +1106,24 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper
continue;
}
void* tablist = taosHashGet(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val));
SArray** tablist = taosHashGet(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val));
if(tablist == NULL) {
tablist = taosArrayInit(8, sizeof(uint64_t));
if(tablist == NULL){
void* tablistNew = taosArrayInit(8, sizeof(JsonMapValue));
if(tablistNew == NULL){
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("out of memory when alloc json tag array");
return -1;
}
if(taosHashPut(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val), &tablistNew, sizeof(void*)) < 0){
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("out of memory when put json tag array");
return -1;
}
tablist = (SArray**)&tablistNew;
}
taosArrayPush(tablist, &TABLE_UID(pTable));
taosArraySort(tablist, compareUint64Val);
taosArrayRemoveDuplicate(tablist, compareUint64Val, NULL);
if(taosHashPut(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val), tablist, sizeof(void*)) < 0){
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("out of memory when put json tag array");
return -1;
}
JsonMapValue jmvalue = {TABLE_UID(pTable), pColIdx->colId};
taosArrayPush(*tablist, &jmvalue);
taosArraySort(*tablist, tscCompareJsonMapValue);
}
}else{
tSkipListPut(pSTable->pIndex, (void *)pTable);
......@@ -1097,22 +1139,52 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
STable *pSTable = pTable->pSuper;
ASSERT(pSTable != NULL);
char* key = getTagIndexKey(pTable);
SArray *res = tSkipListGet(pSTable->pIndex, key);
if(pSTable->tagSchema->columns[0].type == TSDB_DATA_TYPE_JSON){
ASSERT(pSTable->tagSchema->numOfCols == 1);
int16_t nCols = kvRowNCols(pTable->tagVal);
ASSERT(nCols%2 == 1);
for (int j = 0; j < nCols; ++j) {
if (j != 0 && j%2 == 0) continue; // jump value
SColIdx * pColIdx = kvRowColIdxAt(pTable->tagVal, j);
void* val = (kvRowColVal(pTable->tagVal, pColIdx));
if (j == 0){ // json value is the first
int8_t jsonVal = *(int8_t*)val;
ASSERT(jsonVal == TSDB_DATA_BINARY_PLACEHOLDER);
continue;
}
size_t size = taosArrayGetSize(res);
ASSERT(size > 0);
SArray** tablist = taosHashGet(pSTable->jsonKeyMap, varDataVal(val) ,varDataLen(val));
if(tablist == NULL) {
tsdbError("json tag no key error,%d", j);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSkipListNode *pNode = taosArrayGetP(res, i);
JsonMapValue jmvalue = {TABLE_UID(pTable), pColIdx->colId};
void* p = taosArraySearch(*tablist, &jmvalue, tscCompareJsonMapValue, TD_EQ);
if (p == NULL) {
tsdbError("json tag no tableid error,%d", j);
continue;
}
taosArrayRemove(*tablist, TARRAY_ELEM_IDX(*tablist, p));
}
}else {
char * key = getTagIndexKey(pTable);
SArray *res = tSkipListGet(pSTable->pIndex, key);
// STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
if ((STable *)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need
tSkipListRemoveNode(pSTable->pIndex, pNode);
size_t size = taosArrayGetSize(res);
ASSERT(size > 0);
for (int32_t i = 0; i < size; ++i) {
SSkipListNode *pNode = taosArrayGetP(res, i);
// STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
if ((STable *)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need
tSkipListRemoveNode(pSTable->pIndex, pNode);
}
}
}
taosArrayDestroy(res);
taosArrayDestroy(res);
}
return 0;
}
......@@ -1352,7 +1424,12 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
if(pCol->type == TSDB_DATA_TYPE_JSON){
assert(pTable->tagSchema->numOfCols == 1);
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
pTable->jsonKeyMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pTable->jsonKeyMap == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeTable(pTable);
return NULL;
}
}else{
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
......
......@@ -617,7 +617,7 @@ void taosHashCleanup(SHashObj *pHashObj) {
taosArrayDestroy(pHashObj->pMemBlock);
memset(pHashObj, 0, sizeof(SHashObj));
free(pHashObj);
tfree(pHashObj);
}
// for profile only
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册