提交 13affd9b 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

......@@ -232,7 +232,8 @@ void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
......
......@@ -1646,8 +1646,8 @@ _err:
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
// this message is sent from mnode to mnode(read thread to write thread),
// so there is no need for serialization or deserialization
typedef struct {
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
......
......@@ -142,6 +142,7 @@ typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
......
......@@ -1630,7 +1630,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
int32_t vgId) {
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
// cal size
......@@ -1646,10 +1646,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
......@@ -1662,6 +1664,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
taosMemoryFree(cname);
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
......@@ -1697,7 +1700,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
......
......@@ -3702,6 +3702,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
......@@ -3727,6 +3728,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
......
......@@ -127,7 +127,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
size_t tnameLen = strlen(name->tname);
if (tnameLen > 0) {
assert(name->type == TSDB_TABLE_NAME_T);
/*assert(name->type == TSDB_TABLE_NAME_T);*/
dst[len] = TS_PATH_DELIMITER[0];
memcpy(dst + len + 1, name->tname, tnameLen);
......@@ -314,9 +314,9 @@ void buildChildTableName(RandTableName* rName) {
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
if(IS_VAR_DATA_TYPE(tagKv->type)){
if (IS_VAR_DATA_TYPE(tagKv->type)) {
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->length);
}else{
} else {
taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length);
}
}
......
......@@ -206,6 +206,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
......@@ -248,6 +249,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
......@@ -325,6 +327,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif
......
......@@ -456,7 +456,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto CREATE_STREAM_OVER;
}
pDb = mndAcquireDbByStream(pMnode, createStreamReq.name);
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto CREATE_STREAM_OVER;
......
......@@ -748,7 +748,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
SVnode* pVnode = (SVnode*)vnode;
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {
......
......@@ -231,12 +231,26 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode) != QUERY_NODE_VALUE) {
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode;
//param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of TOP/BOTTOM function can only be column");
}
//param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -247,6 +261,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pValue->notReserved = true;
//set result type
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
......@@ -745,19 +760,32 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_VAR_DATA_TYPE(pPara1->resType.type) || !IS_INTEGER_TYPE(para2Type)) {
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 1);
uint8_t para1Type = p1->resType.type;
if (!IS_VAR_DATA_TYPE(pPara0->resType.type) || !IS_INTEGER_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (((SValueNode*)p1)->datum.i < 1) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(para3Type)) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = p2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
int64_t v = ((SValueNode*)p1)->datum.i;
if (v < 0 || v > INT16_MAX) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
}
pFunc->node.resType = (SDataType){.bytes = pPara1->resType.bytes, .type = pPara1->resType.type};
pFunc->node.resType = (SDataType){.bytes = pPara0->resType.bytes, .type = pPara0->resType.type};
return TSDB_CODE_SUCCESS;
}
......
......@@ -1120,12 +1120,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
NEXT_TOKEN(pCxt->pSql, sToken);
SName name;
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
tNameExtractFullName(&name, tbFName);
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
// USING cluase
// USING clause
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
......
......@@ -3386,9 +3386,9 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->igExists = pStmt->ignoreExists;
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, pReq->name);
// tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
if ('\0' != pStmt->targetTabName[0]) {
strcpy(name.dbname, pStmt->targetDbName);
......
......@@ -505,6 +505,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
......@@ -551,6 +552,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册