提交 8494bf17 编写于 作者: wmmhello's avatar wmmhello

opt:partition by tag

上级 742cccba
...@@ -976,7 +976,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -976,7 +976,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SDecoder dc = {0}; SDecoder dc = {0};
// get super table // get super table
tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData); if(tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0){
return -1;
}
tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData; tbDbKey.version = *(int64_t *)pData;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
......
...@@ -3915,24 +3915,100 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI ...@@ -3915,24 +3915,100 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) { static EDealRes doTranslateGroupExpr(SNode** pNode, void* pContext) {
if (groupKey == NULL) { SMetaReader* mr = (SMetaReader*)pContext;
if(nodeType(*pNode) == QUERY_NODE_COLUMN){
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
if (strcmp(pSColumnNode->colName, "tbname") == 0) {
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
} else {
STagVal tagVal = {0};
tagVal.cid = pSColumnNode->colId;
const char* p = metaGetTableTagVal(&mr->me, pSColumnNode->node.resType.type, &tagVal);
if (p == NULL) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
}else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
int32_t len = ((const STag*)p) -> len;
res->datum.p = taosMemoryCalloc(len + 1, 1);
memcpy(res->datum.p, p, len);
} else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
varDataSetLen(res->datum.p, tagVal.nData);
} else {
nodesSetValueNodeValue(res, &(tagVal.i64));
}
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}
return DEAL_RES_CONTINUE;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
if (group == NULL) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
// SSDataBlock data = {0};
// data.info.numOfCols = 3;
// data.info.rows = rowNum;
// data.pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
// for (int32_t i = 0; i < 2; ++i) {
// SColumnInfoData idata = {{0}};
// idata.info.type = TSDB_DATA_TYPE_NULL;
// idata.info.bytes = 10;
// idata.info.colId = i + 1;
//
// int32_t size = idata.info.bytes * rowNum;
// idata.pData = (char *)taosMemoryCalloc(1, size);
// taosArrayPush(res->pDataBlock, &idata);
// }
//
// SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
// taosArrayPush(pBlockList, &src);
//
// SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
// SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
//
// SScalarParam dest = {.columnData = &idata};
// int32_t code = scalarCalculate(group, pBlockList, &dest);
// if (code != TSDB_CODE_SUCCESS) {
// taosArrayDestroy(pBlockList);
// return code;
// }
//
//
// numOfRows = dest.numOfRows;
// taosArrayDestroy(pBlockList);
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pTableListInfo->map == NULL) { if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t keyLen = 0; int32_t keyLen = 0;
void* keyBuf = NULL; void* keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) { SNode* node;
SColumn* pCol = taosArrayGet(groupKey, j); FOREACH(node, group) {
keyLen += pCol->bytes; // actual data + null_flag SExprNode *pExpr = (SExprNode *)node;
keyLen += pExpr->resType.bytes;
} }
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
keyLen += nullFlagSize; keyLen += nullFlagSize;
keyBuf = taosMemoryCalloc(1, keyLen); keyBuf = taosMemoryCalloc(1, keyLen);
...@@ -3946,35 +4022,39 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3946,35 +4022,39 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid); metaGetTableEntryByUid(&mr, info->uid);
SNodeList *groupNew = nodesCloneList(group);
nodesRewriteExprsPostOrder(groupNew, doTranslateGroupExpr, &mr);
char* isNull = (char*)keyBuf; char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + sizeof(int8_t) * numOfGroupCols; char* pStart = (char*)keyBuf + nullFlagSize;
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j); SNode* pNode;
int32_t index = 0;
if (strcmp(pCol->name, "tbname") == 0) { FOREACH(pNode, groupNew){
isNull[i] = 0; SNode* pNew = NULL;
memcpy(pStart, mr.me.name, strlen(mr.me.name)); int32_t code = scalarCalculateConstants(pNode, &pNew);
pStart += strlen(mr.me.name); if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(pNew);
} else { } else {
STagVal tagVal = {0}; nodesClearList(groupNew);
tagVal.cid = pCol->colId; return code;
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal); }
if (p == NULL) {
isNull[j] = 1; ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
continue; SValueNode *pValue = (SValueNode *)pNew;
}
isNull[i] = 0; if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
if (pCol->type == TSDB_DATA_TYPE_JSON) { isNull[index++] = 1;
// int32_t dataLen = getJsonValueLen(pkey->pData); continue;
// memcpy(pStart, (pkey->pData), dataLen); } else {
// pStart += dataLen; isNull[index++] = 0;
} else if (IS_VAR_DATA_TYPE(pCol->type)) { char* data = nodesGetValueFromNode(pValue);
memcpy(pStart, tagVal.pData, tagVal.nData); if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
pStart += tagVal.nData; memcpy(pStart, data, varDataTLen(data));
ASSERT(tagVal.nData <= pCol->bytes); pStart += varDataTLen(data);
} else { } else {
memcpy(pStart, &(tagVal.i64), pCol->bytes); memcpy(pStart, data, pValue->node.resType.bytes);
pStart += pCol->bytes; pStart += pValue->node.resType.bytes;
} }
} }
} }
...@@ -3987,7 +4067,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3987,7 +4067,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
uint64_t tmpId = calcGroupId(keyBuf, len); uint64_t tmpId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
} }
nodesClearList(groupNew);
metaReaderClear(&mr); metaReaderClear(&mr);
} }
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
...@@ -4016,14 +4096,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4016,14 +4096,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL;
}
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
...@@ -4035,9 +4108,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4035,9 +4108,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
...@@ -4063,13 +4134,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4063,13 +4134,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
} }
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
......
...@@ -1121,6 +1121,7 @@ void* nodesGetValueFromNode(SValueNode* pNode) { ...@@ -1121,6 +1121,7 @@ void* nodesGetValueFromNode(SValueNode* pNode) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_JSON:
return (void*)pNode->datum.p; return (void*)pNode->datum.p;
default: default:
break; break;
...@@ -1182,6 +1183,7 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) { ...@@ -1182,6 +1183,7 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_JSON:
pNode->datum.p = (char*)value; pNode->datum.p = (char*)value;
break; break;
default: default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册