diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6fc6557c11fde9e5c692fba801ab1f4..0b5f3be1b0e3365300b545c5c9897109b10b6849 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) { int32_t numOfFields = taos_field_count(msg); taos_print_row(buf, row, fields, numOfFields); printf("%s\n", buf); + + const char* tbName = tmq_get_table_name(msg); + if (tbName) { + printf("from tb: %s\n", tbName); + } } } @@ -101,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -161,6 +166,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ + tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); diff --git a/include/client/taos.h b/include/client/taos.h index 26d4d18234ccd4de946f8a5cc22a517842a3a63b..486d5f5fefd714097e7400d14181bec9d986c026 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -// TODO -#if 0 -DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); -#endif +DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); #if 0 DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7f17c1673bfbc7ae04114a5ba06b077f7c55a241..ae21986c56fb30f07438d33dd93dfecc0d88503e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -252,6 +252,7 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); typedef struct { + int32_t code; int8_t hashMeta; int64_t uid; char* tblFName; @@ -271,7 +272,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); -void tFreeSSubmitRsp(SSubmitRsp *pRsp); +void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) #define COL_IDX_ON ((int8_t)0x2) @@ -2380,6 +2381,7 @@ typedef struct { typedef struct { SMsgHead head; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int8_t withTbName; int32_t epoch; uint64_t reqId; int64_t consumerId; @@ -2489,6 +2491,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); tlen += taosEncodeSSchemaWrapper(buf, pSW); } + if (pRsp->withTbName) { + char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i); + tlen += taosEncodeString(buf, tbName); + } } } return tlen; @@ -2501,6 +2507,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI32(buf, &pRsp->blockNum); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { buf = taosDecodeFixedI8(buf, &pRsp->withTbName); @@ -2519,6 +2526,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); } + if (pRsp->withTbName) { + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pRsp->blockTbName, &name); + } } } return (void*)buf; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 8e918c40f921138bfa2fc2e99c226f7be384c537..c7deaa78451b0d8977ac842b0c24d37eb7a9e9f4 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -217,6 +217,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index bdccd29acfb8c934164bfe831aef8edafd11c992..b5c38e14f49f9ed84555d24ba3d5cd42c8ac6f5e 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -39,15 +39,6 @@ extern "C" { //====================================================================================== //begin API to taosd and qworker -enum { - UDFC_CODE_STOPPING = -1, - UDFC_CODE_PIPE_READ_ERR = -2, - UDFC_CODE_CONNECT_PIPE_ERR = -3, - UDFC_CODE_LOAD_UDF_FAILURE = -4, - UDFC_CODE_INVALID_STATE = -5, - UDFC_CODE_NO_PIPE = -6, -}; - typedef void *UdfcFuncHandle; /** @@ -89,6 +80,7 @@ typedef struct SUdfColumnData { typedef struct SUdfColumn { SUdfColumnMeta colMeta; + bool hasNull; SUdfColumnData colData; } SUdfColumn; @@ -232,6 +224,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) { } else { udfColDataSetNull_f(pColumn, row); } + pColumn->hasNull = true; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 52b2f0c67033779adac885daa66d58872549ffb3..58cdd2cab421af90104326afee620944f9646257 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -645,7 +645,15 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801) #define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802) #define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803) -#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2604) +#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804) + +//udf +#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901) +#define TSDB_CODE_UDF_PIPE_READ_ERR TAOS_DEF_ERROR_CODE(0, 0x2902) +#define TSDB_CODE_UDF_PIPE_CONNECT_ERR TAOS_DEF_ERROR_CODE(0, 0x2903) +#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904) +#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) +#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 3f00c79f46f28826177c5a38f32b5eb0daa69c9b..74e64d5292e9232d803762ec528554be0a9ae975 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -59,6 +59,21 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) { static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); } +// --- Bool + +static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) { + if (buf != NULL) { + ((int8_t *)(*buf))[0] = value ? 1 : 0; + *buf = POINTER_SHIFT(*buf, sizeof(int8_t)); + } + return (int32_t)sizeof(int8_t); +} + +static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { + *value = ((int8_t *)buf)[0] == 0 ? false : true; + return POINTER_SHIFT(buf, sizeof(int8_t)); +} + // ---- Fixed U16 static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) { if (buf != NULL) { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ce689f19cd8f2579047f9e49eca0ca69609d1f5..a6b8b842f94ee51825fe323778ebd6e6ae149c64 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -57,16 +57,17 @@ struct tmq_topic_vgroup_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[TSDB_CGROUP_LEN]; - int8_t autoCommit; - int8_t resetOffset; - uint16_t port; - int32_t autoCommitInterval; - char* ip; - char* user; - char* pass; - char* db; + char clientId[256]; + char groupId[TSDB_CGROUP_LEN]; + int8_t autoCommit; + int8_t resetOffset; + int8_t withTbName; + uint16_t port; + int32_t autoCommitInterval; + char* ip; + char* user; + char* pass; + /*char* db;*/ tmq_commit_cb* commitCb; void* commitCbUserParam; }; @@ -75,6 +76,7 @@ struct tmq_t { // conf char groupId[TSDB_CGROUP_LEN]; char clientId[256]; + int8_t withTbName; int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; @@ -187,6 +189,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); + conf->withTbName = -1; conf->autoCommit = true; conf->autoCommitInterval = 5000; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; @@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } + if (strcmp(key, "msg.with.table.name") == 0) { + if (strcmp(value, "true") == 0) { + conf->withTbName = 1; + } else if (strcmp(value, "false") == 0) { + conf->withTbName = 0; + } else if (strcmp(value, "none") == 0) { + conf->withTbName = -1; + } else { + return TMQ_CONF_INVALID; + } + } + if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } if (strcmp(key, "td.connect.db") == 0) { - conf->db = strdup(value); + /*conf->db = strdup(value);*/ return TMQ_CONF_OK; } @@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); + pTmq->withTbName = conf->withTbName; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; @@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pReq->subKey[tlen] = TMQ_SEPARATOR; strcpy(pReq->subKey + tlen + 1, pTopic->topicName); + pReq->withTbName = tmq->withTbName; pReq->waitTime = waitTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; @@ -1346,3 +1363,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } + +const char* tmq_get_table_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; + } + const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + return name; + } + return NULL; +} diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b58e4bd1ddd8a8c1386167b76ccdc0f8b72c7a45..90531019384d4504392c59b480565d972ff19bcf 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1311,6 +1311,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { tlen += taosEncodeFixedI16(buf, pColData->info.colId); tlen += taosEncodeFixedI16(buf, pColData->info.type); tlen += taosEncodeFixedI32(buf, pColData->info.bytes); + tlen += taosEncodeFixedBool(buf, pColData->hasNull); if (IS_VAR_DATA_TYPE(pColData->info.type)) { tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows); @@ -1340,6 +1341,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.type); buf = taosDecodeFixedI32(buf, &data.info.bytes); + buf = taosDecodeFixedBool(buf, &data.hasNull); if (IS_VAR_DATA_TYPE(data.info.type)) { buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t)); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ebd81c7da31573a28067ebd8d1acf533c1cd6b85..021ee8455eca5b97e7de95b6959e4ea6674eecdd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4032,6 +4032,7 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1; if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; @@ -4047,10 +4048,11 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1; if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; - pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); + pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); if (NULL == pBlock->tblFName) return -1; if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; } @@ -4089,7 +4091,7 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; } - tEndDecode(pDecoder); + tEndDecode(pDecoder); tDecoderClear(pDecoder); return 0; } @@ -4108,4 +4110,3 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) { taosMemoryFree(pRsp); } - diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ca0ae111a390e29d8d70111a437cea08c7f9dec0..12e89277f428089f101887f51526755c39a2f209 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -318,6 +318,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->version = pNew->version; pOld->nextColId = pNew->nextColId; + pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; pOld->numOfTags = pNew->numOfTags; memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema)); @@ -832,7 +833,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) { } static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { - if (pAlter->commentLen != 0) return 0; + if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0; if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; @@ -883,7 +884,8 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) { return 0; } -static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen) { +static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen, + int32_t ttl) { if (commentLen > 0) { pNew->commentLen = commentLen; pNew->comment = taosMemoryCalloc(1, commentLen); @@ -893,6 +895,9 @@ static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pCo } memcpy(pNew->comment, pComment, commentLen); } + if (ttl >= 0) { + pNew->ttl = ttl; + } if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; @@ -1232,7 +1237,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq * code = mndAlterStbColumnBytes(pOld, &stbObj, pField0); break; case TSDB_ALTER_TABLE_UPDATE_OPTIONS: - code = mndUpdateStbComment(pOld, &stbObj, pAlter->comment, pAlter->commentLen); + code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl); break; default: terrno = TSDB_CODE_OPS_NOT_SUPPORT; @@ -1723,7 +1728,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables - char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures + char *p = taosMemoryCalloc(1, pStb->commentLen + 1 + VARSTR_HEADER_SIZE); // check malloc failures if (p != NULL) { if (pStb->commentLen != 0) { STR_TO_VARSTR(p, pStb->comment); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 41d4d5f40651ebe85917550c17f5c720569dbc6e..00379ecda1ed87bf7eb06f36423d36473d425aff 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.subType = TOPIC_SUB_TYPE__TABLE; - topicObj.withTbName = 0; - topicObj.withSchema = 0; + topicObj.withTbName = pCreate->withTbName; + topicObj.withSchema = pCreate->withSchema; SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 1e271bc8d879549c178498393d08479667d666ea..f1917d5fea02f126850495aed62496aa62f7f1a0 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); -int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); -int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); -int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); +// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); +int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); +int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); +int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); #endif #endif @@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); } #endif -#endif /*_TD_VNODE_META_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_META_H_*/ diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 8d2a4ebcf3236010cea6d4f2ed9b5f6e53924f43..7d0a87d79dae0713004f81193d08e600cc5ac1a5 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo skmDbKey.sver = sver; pKey = &skmDbKey; kLen = sizeof(skmDbKey); + metaRLock(pMeta); ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); + metaULock(pMeta); if (ret < 0) { return NULL; } @@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo } struct SMCtbCursor { + SMeta *pMeta; TDBC *pCur; tb_uid_t suid; void *pKey; @@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { return NULL; } + pCtbCur->pMeta = pMeta; pCtbCur->suid = uid; + metaRLock(pMeta); + ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); if (ret < 0) { + metaULock(pMeta); taosMemoryFree(pCtbCur); return NULL; } @@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { if (pCtbCur) { + if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta); if (pCtbCur->pCur) { tdbDbcClose(pCtbCur->pCur); @@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { pSW = metaGetTableSchema(pMeta, quid, sver, 0); if (!pSW) return NULL; - + tdInitTSchemaBuilder(&sb, 0); for (int i = 0; i < pSW->nCols; i++) { pSchema = pSW->pSchema + i; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b49c148ac5a050c81ec1e3198e828a214b8482c4..0cdefb3cffc0f7528cd25c2b1b015ca4016fa175 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -427,9 +427,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.withSchema = pExec->withSchema; + rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockSchema = taosArrayInit(0, sizeof(void*)); + rsp.blockTbName = taosArrayInit(0, sizeof(void*)); + + int8_t withTbName = pExec->withTbName; + if (pReq->withTbName != -1) { + withTbName = pReq->withTbName; + } + rsp.withTbName = withTbName; while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); @@ -535,6 +543,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockSchema, &pSW); } + if (withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + if (metaGetTableEntryByUid(&mr, uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } + rsp.blockNum++; } // db subscribe @@ -563,6 +583,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(actualLen <= dataStrLen); taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockData, &buf); + if (withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); taosArrayPush(rsp.blockSchema, &pSW); @@ -614,6 +644,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 3521e9b1f52db6ab21ce10a65daf7bec153e873a..c1813c787ad0e1465b0007632636301217c9eda0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -486,8 +486,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { for (int i = 1; i < pCommith->niters; i++) { tSkipListDestroyIter(pCommith->iters[i].pIter); - tdFreeSchema(pCommith->iters[i].pTable->pSchema); - taosMemoryFree(pCommith->iters[i].pTable); + if (pCommith->iters[i].pTable) { + tdFreeSchema(pCommith->iters[i].pTable->pSchema); + taosMemoryFreeClear(pCommith->iters[i].pTable); + } } taosMemoryFree(pCommith->iters); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d40a73eb67af9efaa482fcdb799796adfb3c0f5c..037b099345b91f680b27a0ae251a9bb8299236b3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -62,6 +62,16 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); + SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx); + SSkipListNode *pNode = NULL; + STbData *pTbData = NULL; + for (;;) { + if (!tSkipListIterNext(pIter)) break; + pNode = tSkipListIterGet(pIter); + pTbData = (STbData *)pNode->pData; + tsdbFreeTbData(pTbData); + } + tSkipListDestroyIter(pIter); tSkipListDestroy(pMemTable->pSlIdx); taosMemoryFree(pMemTable); } @@ -300,6 +310,17 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo TSKEY keyMax; SSubmitBlk *pBlkCopy; + // check if table exists + SMetaReader mr = {0}; + SMetaEntry me = {0}; + metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { + metaReaderClear(&mr); + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; + return -1; + } + metaReaderClear(&mr); + // create container is nedd tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid)); if (tptr == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d2650725f058cd30efda16e6279429bda81ee07f..4f76bc5386d88b56ebe7575ef848066591a01614 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -502,7 +502,7 @@ _exit: return 0; } -static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { +static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { SSubmitBlkIter blkIter = {0}; STSchema *pSchema = NULL; tb_uid_t suid = 0; @@ -544,7 +544,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char while (true) { if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; - + vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); } @@ -601,7 +601,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - pRsp->code = terrno; + submitBlkRsp.code = terrno; tDecoderClear(&decoder); goto _exit; } @@ -623,8 +623,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { - pRsp->code = terrno; - goto _exit; + submitBlkRsp.code = terrno; } submitRsp.numOfRows += submitBlkRsp.numOfRows; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1260f9a3e7cc121819599b5d3e149ba035bebdc3..f0f5338c4d22e711ade5d474c69fd917378c785f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t ret = 0; SMsgCb *pMsgCb = rpcHandle; if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { + pMsg->noResp = 1; tmsgSendReq(rpcHandle, pEpSet, pMsg); } else { vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 93e81aa70ee2d9d7a3658adc86d4bd3aa55315ad..4881f231348ea0a9cc96d1f4e0a794fa1f52ba08 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); -void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); +void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 33f0c440eced482bb7d7f345455f61d0d49c214e..a5bc1fdf5838b2a5aa987924cc325a4f8b036d5f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, +static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx); @@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; @@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pEntryInfo->numOfRes = 1; continue; } - + int32_t code = TSDB_CODE_SUCCESS; if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { - pCtx[k].fpSet.process(&pCtx[k]); + code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + longjmp(taskInfo->env, code); + } } // restore it @@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction // this can be set during create the struct // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { - pCtx[k].fpSet.process(&pCtx[k]); + int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s call aggregate function error happens, code : %s", + GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + pOperator->pTaskInfo->code = code; + longjmp(pOperator->pTaskInfo->env, code); + } } } } @@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p * @param pQInfo * @param result */ -int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, +int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased int32_t slotId = pExprInfo[j].base.resSchema.slotId; pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); - if (pCtx[j].fpSet.process) { - pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (pCtx[j].fpSet.finalize) { + int32_t code = TSDB_CODE_SUCCESS; + code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code)) { + qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + longjmp(taskInfo->env, code); + } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor } else { @@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased return 0; } -void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, +void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); @@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo } int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); + doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e3a507bf7cdc7e035039042e6878e004d525b2d5..5d22c13ec6a0394c6d83adfb5104cb12b92d3f44 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); @@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); } } @@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); while(1) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 738f4821bde2dd153a4b38457e31bab5977510ba..0f3b1bda20a7d02106f6951bebdb8b65536694a4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; @@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window @@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); // TODO: remove for stream /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ @@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window @@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index f5ee29603e95ab4062c443a6f969f04dbbc5b184..087e2434970b76a7e2bb09836fed988a22b86122 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -95,6 +95,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateDurationFunction(SqlFunctionCtx* pCtx); +bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t csumFunction(SqlFunctionCtx* pCtx); + bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); #ifdef __cplusplus diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 00a87ab2367f16bbb56cad8b7ddab788404f1ac8..e165810e3034ce9557443fba6cadea4a1c8391d7 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return TSDB_CODE_SUCCESS; } +static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of CSUM function can only be column"); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + uint8_t resType; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } else { + if (IS_SIGNED_NUMERIC_TYPE(colType)) { + resType = TSDB_DATA_TYPE_BIGINT; + } else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) { + resType = TSDB_DATA_TYPE_UBIGINT; + } else if (IS_FLOAT_TYPE(colType)) { + resType = TSDB_DATA_TYPE_DOUBLE; + } else { + ASSERT(0); + } + } + + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = stateDurationFunction, .finalizeFunc = NULL }, + { + .name = "csum", + .type = FUNCTION_TYPE_CSUM, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateCsum, + .getEnvFunc = getCsumFuncEnv, + .initFunc = functionSetup, + .processFunc = csumFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aed576a68775e86a028848ddc3bda41a2da66671..ca470a05217c677fe55811374f8e22e64d5a8c3f 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2818,7 +2818,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; @@ -2856,7 +2855,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { TSKEY* tsList = (int64_t*)pInput->pPTS->pData; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; @@ -2896,3 +2894,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { return numOfElems; } + +bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SSumRes); + return true; +} + +int32_t csumFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t numOfElems = 0; + int32_t type = pInputCol->info.type; + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + //colDataAppendNULL(pOutput, i); + continue; + } + + char* data = colDataGetData(pInputCol, i); + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t v; + GET_TYPED_DATA(v, int64_t, type, data); + pSumRes->isum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false); + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t v; + GET_TYPED_DATA(v, uint64_t, type, data); + pSumRes->usum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false); + } else if (IS_FLOAT_TYPE(type)) { + double v; + GET_TYPED_DATA(v, double, type, data); + pSumRes->dsum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false); + } + + //TODO: remove this after pTsOutput is handled + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + + numOfElems++; + } + + return numOfElems; +} diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index a577ea200fa5e41cef7bae9894bfe769fcce996a..dfa7fac15ad628e16de391553a55bd2749896a0d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -695,6 +695,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colMeta.scale = col->info.scale; udfCol->colMeta.precision = col->info.precision; udfCol->colData.numOfRows = udfBlock->numOfRows; + udfCol->hasNull = col->hasNull; if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); @@ -731,6 +732,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { col->info.bytes = meta->bytes; col->info.scale = meta->scale; col->info.type = meta->type; + col->hasNull = udfCol->hasNull; SUdfColumnData *data = &udfCol->colData; if (!IS_VAR_DATA_TYPE(meta->type)) { @@ -929,7 +931,7 @@ void udfcUvHandleError(SClientUvConn *conn) { while (!QUEUE_EMPTY(&conn->taskQueue)) { QUEUE* h = QUEUE_HEAD(&conn->taskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); - task->errCode = UDFC_CODE_PIPE_READ_ERR; + task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR; QUEUE_REMOVE(&task->connTaskQueue); QUEUE_REMOVE(&task->procTaskQueue); uv_sem_post(&task->taskSem); @@ -1117,7 +1119,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) { QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); if (udfc->gUdfcState == UDFC_STATE_STOPPING) { - task->errCode = UDFC_CODE_STOPPING; + task->errCode = TSDB_CODE_UDF_STOPPING; } uv_sem_post(&task->taskSem); } @@ -1127,7 +1129,7 @@ void cleanUpUvTasks(SUdfdProxy *udfc) { QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); if (udfc->gUdfcState == UDFC_STATE_STOPPING) { - task->errCode = UDFC_CODE_STOPPING; + task->errCode = TSDB_CODE_UDF_STOPPING; } uv_sem_post(&task->taskSem); } @@ -1211,7 +1213,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { fnInfo("udfc setup udf. udfName: %s", udfName); if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) { - return UDFC_CODE_INVALID_STATE; + return TSDB_CODE_UDF_INVALID_STATE; } SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask)); task->errCode = 0; @@ -1225,7 +1227,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName); - return UDFC_CODE_CONNECT_PIPE_ERR; + return TSDB_CODE_UDF_PIPE_CONNECT_ERR; } udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); @@ -1252,7 +1254,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf SClientUdfUvSession *session = (SClientUdfUvSession *) handle; if (session->udfUvPipe == NULL) { fnError("No pipe to udfd"); - return UDFC_CODE_NO_PIPE; + return TSDB_CODE_UDF_PIPE_NO_PIPE; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; @@ -1372,7 +1374,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) { SClientUdfUvSession *session = (SClientUdfUvSession *) handle; if (session->udfUvPipe == NULL) { fnError("pipe to udfd does not exist"); - return UDFC_CODE_NO_PIPE; + return TSDB_CODE_UDF_PIPE_NO_PIPE; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 0ad4674cfa8a01f7e805afe99899b6bbb121b0cc..f006695f14573309d1a5405860466850346429e5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -102,7 +102,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { int err = uv_dlopen(udf->path, &udf->lib); if (err != 0) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - return UDFC_CODE_LOAD_UDF_FAILURE; + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0}; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index cb539e837929bf7b8dcfd958f4caaac805d760e0..8a21eea7b7c9ce2254ce0962c8a0b67489f06e8c 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -271,6 +271,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 83f31c5dac0815b8d8e0f16048a5a8bb0a2b6e66..159af1610e72a7b96f772e11e1589ce9b3e4498f 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -57,6 +57,11 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b); SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b); void syncUtilMsgHtoN(void* msg); void syncUtilMsgNtoH(void* msg); +bool syncUtilIsData(tmsg_t msgType); +bool syncUtilUserPreCommit(tmsg_t msgType); +bool syncUtilUserCommit(tmsg_t msgType); +bool syncUtilUserRollback(tmsg_t msgType); + #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 6623ed1caa3eed6a7fc4cc98654bf5fb6dbc172e..aed19d042e54195ab98d5bd86a38bd8f5fa2be03 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -19,6 +19,7 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "syncRaftCfg.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -199,7 +200,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + //if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -227,7 +229,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; cbMeta.isWeak = pAppendEntry->isWeak; @@ -258,7 +261,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; cbMeta.isWeak = pAppendEntry->isWeak; @@ -320,7 +324,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -330,6 +335,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); } + // config change + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + syncNodeUpdateConfig(ths, &newSyncCfg); + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5d6cbd2a586831cf6be70e380abccf16898345ea..620f0e9cd20d8f4738d9bf3cfdf7a985f7493b65 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -19,6 +19,7 @@ #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" +#include "syncRaftCfg.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -101,7 +102,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -111,6 +113,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); } + // config change + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 911d8384f076a6ab36dfddfceeec623b794afab9..da23d8415bdc6f482f3db915487d8029954e96e2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -116,6 +116,15 @@ void syncStop(int64_t rid) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; + char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; + rpcMsg.noResp = 1; + rpcMsg.contLen = strlen(configChange) + 1; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange); + taosMemoryFree(configChange); + ret = syncPropose(rid, &rpcMsg, false); return ret; } @@ -849,6 +858,35 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { + pSyncNode->pRaftCfg->cfg = *newConfig; + int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); + ASSERT(ret == 0); + + // init internal + pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; + syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + + // init peersNum, peers, peersId + pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; + int j = 0; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + if (i != pSyncNode->pRaftCfg->cfg.myIndex) { + pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; + j++; + } + } + for (int i = 0; i < pSyncNode->peersNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + } + + // init replicaNum, replicasId + pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + } +} + SSyncNode* syncNodeAcquire(int64_t rid) { SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); if (pNode == NULL) { @@ -1207,7 +1245,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -1228,7 +1267,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 3110c0b2a338e04d8aec6bf61ea5f9019234c69d..cf045a692611a64e75c2f4c595180f1e324e75f9 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -212,4 +212,32 @@ void syncUtilMsgNtoH(void* msg) { SMsgHead* pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); +} + +bool syncUtilIsData(tmsg_t msgType) { + if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) { + return false; + } + return true; +} + +bool syncUtilUserPreCommit(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; +} + +bool syncUtilUserCommit(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; +} + +bool syncUtilUserRollback(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; } \ No newline at end of file diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index cf7dd5010343fa86a3c322492771ce20083e226f..fffda68731cb15da91669c72913f776cb67496ca 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -114,6 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB int tdbBtreeClose(SBTree *pBt) { if (pBt) { + tdbFree(pBt->pBuf); tdbOsFree(pBt); } return 0; diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index aa056874267f7f09f4896d6429af00598e09a915..8574e071f2a9b4d230a75d5ccb7d4a5b73bbbf24 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { int h; h = tdbPCachePageHash(&(pPage->pgid)); - for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) + for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) ; ASSERT(*ppPage == pPage); *ppPage = pPage->pHashNext; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 0cfe75bf333a0abaf0684fd05ce8a20adf811170..b04dea0213add6a24caba63b456d7f0ee4fe6b2d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -65,11 +65,18 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); + wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr()); return -1; } SWalIdxEntry entry; - if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("failed to read idx file, since %s", terrstr()); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + wError("read idx file incompletely, read bytes %d, bytes should be %lu", ret, sizeof(SWalIdxEntry)); + } return -1; } @@ -77,6 +84,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); + wError("failed to seek log file, ver %ld, pos: %ld, since %s", ver, entry.offset, terrstr()); return -1; } return ret; @@ -92,6 +100,8 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pLogTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_WAL_INVALID_VER; + wError("cannot open file %s, since %s", fnameStr, terrstr()); return -1; } @@ -99,6 +109,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pIdxTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); + wError("cannot open file %s, since %s", fnameStr, terrstr()); return -1; } @@ -113,6 +124,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { return 0; } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer); terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } @@ -125,11 +137,13 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); if (pRead->curFileFirstVer != pRet->firstVer) { + // error code set inner if (walReadChangeFile(pRead, pRet->firstVer) < 0) { return -1; } } + // error code set inner if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { return -1; } @@ -154,9 +168,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { if (code < 0) return -1; } - if (!taosValidFile(pRead->pReadLogTFile)) { - return -1; - } + ASSERT(taosValidFile(pRead->pReadLogTFile) == true); code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { @@ -249,16 +261,19 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { // TODO: check wal life if (pRead->curVersion != ver) { if (walReadSeekVer(pRead, ver) < 0) { + wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr()); return -1; } } - if (!taosValidFile(pRead->pReadLogTFile)) { - return -1; - } + ASSERT(taosValidFile(pRead->pReadLogTFile) == true); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { + if (code < 0) + terrno = TAOS_SYSTEM_ERROR(errno); + else + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index da1c36dcc42ed3d1d3d454856cf4e5d3c1b2e8bc..2e439975849fd38adc7e77ed061af4219bc1fd73 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -225,6 +225,7 @@ int walRoll(SWal *pWal) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + // terrno set inner code = walRollFileInfo(pWal); if (code != 0) { return -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e32ecfc69522f418633fbe537bd5b70abe275400..9676e4e1f99be5c9f76f7a757053aa4995d7f713 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -454,6 +454,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied") //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "planner internal error") +//udf +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_READ_ERR, "udf pipe read error") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect error") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") + //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type") diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 19a5d3ecc1e314cbbb52f7c5249afd62c3303928..adbab04e07e31617b3f0aaf7abaa2f2a3c59b3dc 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -150,6 +150,9 @@ class TDSql: raise Exception(repr(e)) return (self.queryRows, timeout) + def getRows(self): + return self.queryRows + def checkRows(self, expectRows): if self.queryRows == expectRows: tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows)) diff --git a/tests/script/tsim/stable/alter1.sim b/tests/script/tsim/stable/alter1.sim index 5cee10756c97d8c4f9aa44d0b06c1608dc5f420a..1205f50f6ea144de6f5fae06ef7569a60b47e0cb 100644 --- a/tests/script/tsim/stable/alter1.sim +++ b/tests/script/tsim/stable/alter1.sim @@ -159,6 +159,7 @@ sql alter table db.stb rename tag t1 tx print ========== alter common sql alter table db.stb comment 'abcde' ; +sql alter table db.stb ttl 10 ; sql show db.stables; if $data[0][6] != abcde then diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index d8050c53c54d59a7105d41be2f320df14f9e7456..d3da4f2c596c45efce438839f544a35e4980f898 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -346,8 +346,10 @@ class TDTestCase: return def test_case3(self): + self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000) + # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000) + # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) - self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*1000) # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json index 5487dff70821f2b1a7d99be37ab3c77c83d9be6f..e6719aedc988c2f67c70210423bb5c9d6c573434 100644 --- a/tests/system-test/1-insert/manyVgroups.json +++ b/tests/system-test/1-insert/manyVgroups.json @@ -29,8 +29,8 @@ "batch_create_tbl_num": 50000, "data_source": "rand", "insert_mode": "taosc", - "insert_rows": 0, - "interlace_rows": 0, + "insert_rows": 10, + "interlace_rows": 100000, "insert_interval": 0, "max_sql_len": 10000000, "disorder_ratio": 0,