提交 ea485e98 编写于 作者: L liuyao

opt partition by tbname

上级 b2d141fe
...@@ -103,6 +103,7 @@ typedef struct SScanLogicNode { ...@@ -103,6 +103,7 @@ typedef struct SScanLogicNode {
bool hasNormalCols; // neither tag column nor primary key tag column bool hasNormalCols; // neither tag column nor primary key tag column
bool sortPrimaryKey; bool sortPrimaryKey;
bool igLastNull; bool igLastNull;
char* stbFullTableName;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
...@@ -253,6 +254,7 @@ typedef struct SPartitionLogicNode { ...@@ -253,6 +254,7 @@ typedef struct SPartitionLogicNode {
SNodeList* pPartitionKeys; SNodeList* pPartitionKeys;
SNodeList* pTags; SNodeList* pTags;
SNode* pSubtable; SNode* pSubtable;
char* stbFullTableName;
} SPartitionLogicNode; } SPartitionLogicNode;
typedef enum ESubplanType { typedef enum ESubplanType {
...@@ -367,6 +369,7 @@ typedef struct STableScanPhysiNode { ...@@ -367,6 +369,7 @@ typedef struct STableScanPhysiNode {
int8_t igExpired; int8_t igExpired;
bool assignBlockUid; bool assignBlockUid;
int8_t igCheckUpdate; int8_t igCheckUpdate;
char stbFullTableName[TSDB_TABLE_NAME_LEN];
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;
...@@ -538,6 +541,7 @@ typedef struct SStreamPartitionPhysiNode { ...@@ -538,6 +541,7 @@ typedef struct SStreamPartitionPhysiNode {
SPartitionPhysiNode part; SPartitionPhysiNode part;
SNodeList* pTags; SNodeList* pTags;
SNode* pSubtable; SNode* pSubtable;
char stbFullTableName[TSDB_TABLE_NAME_LEN];
} SStreamPartitionPhysiNode; } SStreamPartitionPhysiNode;
typedef struct SDataSinkNode { typedef struct SDataSinkNode {
......
...@@ -42,6 +42,7 @@ typedef struct SPlanContext { ...@@ -42,6 +42,7 @@ typedef struct SPlanContext {
const char* pUser; const char* pUser;
bool sysInfo; bool sysInfo;
int64_t allocatorId; int64_t allocatorId;
char* stbFullTableName;
} SPlanContext; } SPlanContext;
// Create the physical plan for the query, according to the AST. // Create the physical plan for the query, according to the AST.
......
...@@ -239,6 +239,7 @@ typedef struct { ...@@ -239,6 +239,7 @@ typedef struct {
void* vnode; // not available to encoder and decoder void* vnode; // not available to encoder and decoder
FTbSink* tbSinkFunc; FTbSink* tbSinkFunc;
STSchema* pTSchema; STSchema* pTSchema;
SSHashObj* pUidInfo;
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
......
...@@ -392,6 +392,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -392,6 +392,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.igExpired = pObj->igExpired, .igExpired = pObj->igExpired,
.deleteMark = pObj->deleteMark, .deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate, .igCheckUpdate = pObj->igCheckUpdate,
.stbFullTableName = pObj->targetSTbName,
}; };
// using ast and param to build physical plan // using ast and param to build physical plan
......
...@@ -646,6 +646,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -646,6 +646,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->tbSink.pTSchema == NULL) { if (pTask->tbSink.pTSchema == NULL) {
return -1; return -1;
} }
pTask->tbSink.pUidInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......
...@@ -90,6 +90,15 @@ end: ...@@ -90,6 +90,15 @@ end:
return ret; return ret;
} }
int32_t getTblUid(SSHashObj* tblInfo ,char* tbName, int64_t* uid) {
void* pVal = tSimpleHashGet(tblInfo, tbName, strlen(tbName));
if (pVal) {
*uid = *(int64_t*)pVal;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL; void* buf = NULL;
int32_t tlen = 0; int32_t tlen = 0;
...@@ -261,99 +270,94 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -261,99 +270,94 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.uid = 0; // uid is assigned by vnode tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version; tbData.sver = pTSchema->version;
char* ctbName = NULL; char* ctbName = pDataBlock->info.parTbName;
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); if (!pDataBlock->info.parTbName[0]) {
if (pDataBlock->info.parTbName[0]) { char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
ctbName = taosStrdup(pDataBlock->info.parTbName); memcpy(ctbName, tmp, strlen(tmp));
} else { taosMemoryFree(tmp);
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); tqError("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), pDataBlock->info.id.groupId);
} }
SMetaReader mr = {0}; int32_t res = getTblUid(pTask->tbSink.pUidInfo, ctbName, &tbData.uid);
metaReaderInit(&mr, pVnode->pMeta, 0); if (res != TSDB_CODE_SUCCESS) {
if (metaGetTableEntryByName(&mr, ctbName) < 0) { SMetaReader mr = {0};
metaReaderClear(&mr); metaReaderInit(&mr, pVnode->pMeta, 0);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
taosMemoryFree(ctbName); goto _end;
goto _end; };
};
// set const // set const
pCreateTbReq->flags = 0; pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE; pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid; pCreateTbReq->ctb.suid = suid;
// set super table name // set super table name
SName name = {0}; SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
// set tag content // set tag content
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
taosMemoryFree(ctbName); tdDestroySVCreateTbReq(pCreateTbReq);
tdDestroySVCreateTbReq(pCreateTbReq); goto _end;
goto _end; }
} STagVal tagVal = {
STagVal tagVal = { .cid = pTSchema->numOfCols + 1,
.cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT,
.type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId,
.i64 = (int64_t)pDataBlock->info.id.groupId, };
}; taosArrayPush(tagArray, &tagVal);
taosArrayPush(tagArray, &tagVal); pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL; STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pTag == NULL) {
taosMemoryFree(ctbName); tdDestroySVCreateTbReq(pCreateTbReq);
tdDestroySVCreateTbReq(pCreateTbReq); terrno = TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY; goto _end;
taosMemoryFree(ctbName); }
tdDestroySVCreateTbReq(pCreateTbReq); pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set tag name // set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0}; char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id"); strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr); taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName; pCreateTbReq->ctb.tagName = tagName;
// set table name // set table name
pCreateTbReq->name = ctbName; pCreateTbReq->name = taosStrdup(ctbName);
ctbName = NULL;
tbData.pCreateTbReq = pCreateTbReq;
tbData.pCreateTbReq = pCreateTbReq; tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; } else {
} else { if (mr.me.type != TSDB_CHILD_TABLE) {
if (mr.me.type != TSDB_CHILD_TABLE) { tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, mr.me.type);
mr.me.type); metaReaderClear(&mr);
metaReaderClear(&mr); continue;
taosMemoryFree(ctbName); }
continue;
}
if (mr.me.ctbEntry.suid != suid) { if (mr.me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
", actual suid %" PRId64 "", ", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr);
continue;
}
tbData.uid = mr.me.uid;
tSimpleHashPut(pTask->tbSink.pUidInfo, ctbName, strlen(ctbName), &tbData.uid, sizeof(int64_t));
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
} }
tbData.uid = mr.me.uid;
metaReaderClear(&mr);
taosMemoryFreeClear(ctbName);
} }
// rows // rows
......
...@@ -366,6 +366,7 @@ typedef struct SStreamScanInfo { ...@@ -366,6 +366,7 @@ typedef struct SStreamScanInfo {
int8_t igCheckUpdate; int8_t igCheckUpdate;
int8_t igExpired; int8_t igExpired;
SStreamState* pState; SStreamState* pState;
char stbFullName[TSDB_TABLE_FNAME_LEN];
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
...@@ -525,6 +526,7 @@ typedef struct SStreamPartitionOperatorInfo { ...@@ -525,6 +526,7 @@ typedef struct SStreamPartitionOperatorInfo {
int32_t tsColIndex; int32_t tsColIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SSDataBlock* pCreateTbRes; SSDataBlock* pCreateTbRes;
char stbFullName[TSDB_TABLE_FNAME_LEN];
} SStreamPartitionOperatorInfo; } SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter { typedef struct SStreamFillSupporter {
...@@ -657,7 +659,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord ...@@ -657,7 +659,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData); int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, char* stbFullName);
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
......
...@@ -1486,7 +1486,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -1486,7 +1486,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { if (strcmp(pName, "_select_value") == 0) {
pValCtx[num++] = &pCtx[i]; pValCtx[num++] = &pCtx[i];
} else if (fmIsSelectFunc(pCtx[i].functionId)) { } else if (fmIsSelectFunc(pCtx[i].functionId)) {
p = &pCtx[i]; p = &pCtx[i];
......
...@@ -991,46 +991,48 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -991,46 +991,48 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
} }
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, char* stbFullName) {
void* pValue = NULL; void* pValue = NULL;
if (streamStateGetParName(pState, groupId, &pValue) != 0) { if (streamStateGetParName(pState, groupId, &pValue) != 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId; pTmpBlock->info.id.groupId = groupId;
char* tbName = pSrcBlock->info.parTbName; char* tbName = pSrcBlock->info.parTbName;
int32_t len = 0;
if (pTableSup->numOfExprs > 0) { if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
memset(tbName, 0, TSDB_TABLE_NAME_LEN); memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = 0;
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) { if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
len = 1; char* pTbName = buildCtbNameByGroupId(stbFullName, groupId);
tbName[0] = 0; len = TMIN(strlen(pTbName), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, pTbName, len);
} else { } else {
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1); void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len); memcpy(tbName, varDataVal(pData), len);
streamStatePutParName(pState, groupId, tbName);
} }
memcpy(pTmpBlock->info.parTbName, tbName, len);
pDestBlock->info.rows--; pDestBlock->info.rows--;
} else { } else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
colDataSetNULL(pTbNameCol, pDestBlock->info.rows); colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
tbName[0] = 0; char* pTbName = buildCtbNameByGroupId(stbFullName, groupId);
len = TMIN(strlen(pTbName), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, pTbName, len);
} }
if (pTagSup->numOfExprs > 0) { if (pTagSup->numOfExprs > 0) {
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL); projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
pDestBlock->info.rows--; pDestBlock->info.rows--;
} else {
memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
} }
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false); colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++; pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock); blockDataDestroy(pTmpBlock);
pDestBlock->info.id.groupId = groupId;
memcpy(pDestBlock->info.parTbName, tbName, len);
streamStatePutParName(pState, groupId, tbName);
} else { } else {
memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN); memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
} }
...@@ -1039,8 +1041,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* ...@@ -1039,8 +1041,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info; SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || if (taosHashGetSize(pInfo->pPartitions) == 0) {
taosHashGetSize(pInfo->pPartitions) == 0) {
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pCreateTbRes); blockDataCleanup(pInfo->pCreateTbRes);
...@@ -1051,7 +1052,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { ...@@ -1051,7 +1052,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte; SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0); int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes); pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes, pInfo->stbFullName);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte); pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
} }
return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL; return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL;
...@@ -1120,6 +1121,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -1120,6 +1121,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
} break; } break;
default: default:
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type");
printDataBlock(pBlock, "stream partitionby");
return pBlock; return pBlock;
} }
...@@ -1265,11 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1265,11 +1267,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
} }
} }
if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) { pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
} else {
pInfo->pCreateTbRes = NULL;
}
int32_t keyLen = 0; int32_t keyLen = 0;
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
...@@ -1295,6 +1293,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1295,6 +1293,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
pInfo->tsColIndex = 0; pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
memcpy(pInfo->stbFullName, pPartNode->stbFullTableName, strlen(pPartNode->stbFullTableName));
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
......
...@@ -1465,11 +1465,9 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, ...@@ -1465,11 +1465,9 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
blockDataCleanup(pInfo->pCreateTbRes); blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { if (!pInfo->partitionSup.needCalc) {
pBlock->info.parTbName[0] = 0;
} else {
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes); pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, pInfo->stbFullName);
} }
} }
...@@ -1583,7 +1581,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1583,7 +1581,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
...@@ -2486,6 +2484,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2486,6 +2484,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igExpired = pTableScanNode->igExpired; pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL; pInfo->pState = NULL;
memcpy(pInfo->stbFullName, pTableScanNode->stbFullTableName, strlen(pTableScanNode->stbFullTableName));
// for stream // for stream
if (pTaskInfo->streamInfo.pState) { if (pTaskInfo->streamInfo.pState) {
......
...@@ -3457,6 +3457,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3457,6 +3457,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pBlock; return pBlock;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
......
...@@ -393,6 +393,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -393,6 +393,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable); CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(igLastNull);
COPY_SCALAR_FIELD(stbFullTableName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -508,6 +509,7 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog ...@@ -508,6 +509,7 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pPartitionKeys);
CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable); CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(stbFullTableName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1651,6 +1651,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags"; ...@@ -1651,6 +1651,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static const char* jkTableScanPhysiPlanStbFullTableName = "stbFullTableName";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
...@@ -1719,6 +1720,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1719,6 +1720,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate); code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkTableScanPhysiPlanStbFullTableName, pNode->stbFullTableName);
}
return code; return code;
} }
...@@ -1790,6 +1794,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1790,6 +1794,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate); code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkTableScanPhysiPlanStbFullTableName, pNode->stbFullTableName);
}
return code; return code;
} }
...@@ -2454,6 +2461,7 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) { ...@@ -2454,6 +2461,7 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
static const char* jkStreamPartitionPhysiPlanTags = "Tags"; static const char* jkStreamPartitionPhysiPlanTags = "Tags";
static const char* jkStreamPartitionPhysiPlanSubtable = "Subtable"; static const char* jkStreamPartitionPhysiPlanSubtable = "Subtable";
static const char* jkStreamPartitionPhysiPlanStbFullTableName = "StbFullTableName";
static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) {
const SStreamPartitionPhysiNode* pNode = (const SStreamPartitionPhysiNode*)pObj; const SStreamPartitionPhysiNode* pNode = (const SStreamPartitionPhysiNode*)pObj;
...@@ -2465,6 +2473,9 @@ static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) { ...@@ -2465,6 +2473,9 @@ static int32_t physiStreamPartitionNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStreamPartitionPhysiPlanSubtable, nodeToJson, pNode->pSubtable); code = tjsonAddObject(pJson, jkStreamPartitionPhysiPlanSubtable, nodeToJson, pNode->pSubtable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkStreamPartitionPhysiPlanStbFullTableName, pNode->stbFullTableName);
}
return code; return code;
} }
...@@ -2479,6 +2490,9 @@ static int32_t jsonToPhysiStreamPartitionNode(const SJson* pJson, void* pObj) { ...@@ -2479,6 +2490,9 @@ static int32_t jsonToPhysiStreamPartitionNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStreamPartitionPhysiPlanSubtable, &pNode->pSubtable); code = jsonToNodeObject(pJson, jkStreamPartitionPhysiPlanSubtable, &pNode->pSubtable);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkStreamPartitionPhysiPlanStbFullTableName, pNode->stbFullTableName);
}
return code; return code;
} }
......
...@@ -349,6 +349,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -349,6 +349,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->deleteMark = pCxt->pPlanCxt->deleteMark; pScan->deleteMark = pCxt->pPlanCxt->deleteMark;
pScan->igExpired = pCxt->pPlanCxt->igExpired; pScan->igExpired = pCxt->pPlanCxt->igExpired;
pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate; pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
pScan->stbFullTableName = pCxt->pPlanCxt->stbFullTableName;
} }
// set columns to scan // set columns to scan
...@@ -1152,6 +1153,7 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS ...@@ -1152,6 +1153,7 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
} else { } else {
nodesDestroyNode((SNode*)pPartition); nodesDestroyNode((SNode*)pPartition);
} }
pPartition->stbFullTableName = pCxt->pPlanCxt->stbFullTableName;
return code; return code;
} }
......
...@@ -584,6 +584,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp ...@@ -584,6 +584,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate; pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
if (pScanLogicNode->stbFullTableName) {
strcpy(pTableScan->stbFullTableName, pScanLogicNode->stbFullTableName);
}
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1462,6 +1465,9 @@ static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList ...@@ -1462,6 +1465,9 @@ static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable); code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable);
} }
if (pPartLogicNode->stbFullTableName) {
strcpy(pPart->stbFullTableName, pPartLogicNode->stbFullTableName);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pPart; *pPhyNode = (SPhysiNode*)pPart;
} else { } else {
......
...@@ -1046,7 +1046,6 @@ _end: ...@@ -1046,7 +1046,6 @@ _end:
} }
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname");
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) {
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册