diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9ec80293a838f9ee825dc77e97f31b12d621caf6..5b0aaece99fa84db226310af6a87c2cf4d6f221d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1455,7 +1455,7 @@ typedef struct { static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo)); if (pRebInfo == NULL) { - goto _err; + return NULL; } strcpy(pRebInfo->key, key); pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t)); diff --git a/include/common/tname.h b/include/common/tname.h index ae2dc32335d8e33692a3c05c773d539e31c8a669..28f97d10285d0f620ef919db9e691c98e3a28197 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -16,8 +16,8 @@ #ifndef _TD_COMMON_NAME_H_ #define _TD_COMMON_NAME_H_ -#include "tdef.h" #include "tarray.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -65,19 +65,19 @@ bool tNameDBNameEqual(SName* left, SName* right); typedef struct { // input - SArray *tags; // element is SSmlKV - const char *sTableName; // super table name - uint8_t sTableNameLen; // the length of super table name + SArray* tags; // element is SSmlKv + const char* sTableName; // super table name + uint8_t sTableNameLen; // the length of super table name // output - char *childTableName; // must have size of TSDB_TABLE_NAME_LEN; - uint64_t uid; // child table uid, may be useful + char* childTableName; // must have size of TSDB_TABLE_NAME_LEN; + uint64_t uid; // child table uid, may be useful } RandTableName; -void buildChildTableName(RandTableName *rName); +void buildChildTableName(RandTableName* rName); #ifdef __cplusplus } #endif -#endif /*_TD_COMMON_NAME_H_*/ +#endif /*_TD_COMMON_NAME_H_*/ diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e277622c40b60a0e96e81ba35f3711f652a98a0b..56e6a39ce8e45af9894e47144afd327cbae28885 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -16,6 +16,7 @@ #include "tdatablock.h" #include "tmsg.h" #include "tmsgcb.h" +#include "tqueue.h" #include "trpc.h" #ifdef __cplusplus @@ -154,6 +155,10 @@ struct SStreamTask { STaskDispatcherShuffle shuffleDispatcher; }; + // msg buffer + int32_t memUsed; + STaosQueue* inputQ; + // application storage void* ahandle; }; @@ -194,6 +199,8 @@ typedef struct { SArray* res; // SArray } SStreamSinkReq; +int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType); + int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); #ifdef __cplusplus diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d674b8286bf5c465da40557e23355a9e176599e9..c768e001c5479da84b5ffbaa2a516eeb4a9b620d 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -187,7 +187,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); - conf->autoCommit = false; + conf->autoCommit = true; conf->autoCommitInterval = 5000; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 62ba4bfb792e34a2c3d45fb300bcf0ead865aa97..56fbfed8fff294ac482f5ffe25a00192f4bb9880 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -18,11 +18,9 @@ #include "tcommon.h" #include "tstrbuild.h" -#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) +#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) -bool tscValidateTableNameLength(size_t len) { - return len < TSDB_TABLE_NAME_LEN; -} +bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; } #if 0 // TODO refactor @@ -95,12 +93,12 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in * but in case of DST, the start time of one day need to be dynamically decided. */ // todo refactor to extract function that is available for Linux/Windows/Mac platform - #if defined(WINDOWS) && _MSC_VER >= 1900 +#if defined(WINDOWS) && _MSC_VER >= 1900 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 int64_t timezone = _timezone; int32_t daylight = _daylight; char** tzname = _tzname; - #endif +#endif int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; start += timezone * t; @@ -142,10 +140,10 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { int32_t tNameLen(const SName* name) { assert(name != NULL); - char tmp[12] = {0}; + char tmp[12] = {0}; int32_t len = sprintf(tmp, "%d", name->acctId); - int32_t len1 = (int32_t) strlen(name->dbname); - int32_t len2 = (int32_t) strlen(name->tname); + int32_t len1 = (int32_t)strlen(name->dbname); + int32_t len2 = (int32_t)strlen(name->tname); if (name->type == TSDB_DB_NAME_T) { assert(len2 == 0); @@ -200,9 +198,7 @@ const char* tNameGetTableName(const SName* name) { return &name->tname[0]; } -void tNameAssign(SName* dst, const SName* src) { - memcpy(dst, src, sizeof(SName)); -} +void tNameAssign(SName* dst, const SName* src) { memcpy(dst, src, sizeof(SName)); } int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t nameLen) { assert(dst != NULL && dbName != NULL && nameLen > 0); @@ -244,7 +240,6 @@ bool tNameDBNameEqual(SName* left, SName* right) { return (0 == strcmp(left->dbname, right->dbname)); } - int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { assert(dst != NULL && str != NULL && strlen(str) > 0); @@ -260,14 +255,14 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { if ((type & T_NAME_DB) == T_NAME_DB) { dst->type = TSDB_DB_NAME_T; - char* start = (char*)((p == NULL)? str:(p+1)); + char* start = (char*)((p == NULL) ? str : (p + 1)); int32_t len = 0; p = strstr(start, TS_PATH_DELIMITER); if (p == NULL) { - len = (int32_t) strlen(start); + len = (int32_t)strlen(start); } else { - len = (int32_t) (p - start); + len = (int32_t)(p - start); } // too long account id or too long db name @@ -275,21 +270,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return -1; } - memcpy (dst->dbname, start, len); + memcpy(dst->dbname, start, len); dst->dbname[len] = 0; } if ((type & T_NAME_TABLE) == T_NAME_TABLE) { dst->type = TSDB_TABLE_NAME_T; - char* start = (char*) ((p == NULL)? str: (p+1)); + char* start = (char*)((p == NULL) ? str : (p + 1)); // too long account id or too long db name - int32_t len = (int32_t) strlen(start); + int32_t len = (int32_t)strlen(start); if ((len >= tListLen(dst->tname)) || (len <= 0)) { return -1; } - memcpy (dst->tname, start, len); + memcpy(dst->tname, start, len); dst->tname[len] = 0; } @@ -305,14 +300,14 @@ static int compareKv(const void* p1, const void* p2) { if (res != 0) { return res; } else { - return kvLen1-kvLen2; + return kvLen1 - kvLen2; } } /* * use stable name and tags to grearate child table name */ -void buildChildTableName(RandTableName *rName) { +void buildChildTableName(RandTableName* rName) { int32_t size = taosArrayGetSize(rName->tags); ASSERT(size > 0); taosArraySort(rName->tags, compareKv); @@ -320,19 +315,19 @@ void buildChildTableName(RandTableName *rName) { SStringBuilder sb = {0}; taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen); for (int j = 0; j < size; ++j) { - SSmlKv *tagKv = taosArrayGetP(rName->tags, j); + SSmlKv* tagKv = taosArrayGetP(rName->tags, j); taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); T_MD5_CTX context; tMD5Init(&context); - tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + tMD5Update(&context, (uint8_t*)keyJoined, (uint32_t)len); tMD5Final(&context); uint64_t digest1 = *(uint64_t*)(context.digest); uint64_t digest2 = *(uint64_t*)(context.digest + 8); - snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016" PRIx64 "%016" PRIx64, digest1, digest2); taosStringBuilderDestroy(&sb); rName->uid = digest1; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a2c628b8a1e4dbdc4148cc16f4a8aea60dcc2fe5..8225eca6596918c784d4d98ea49eca291b0c538d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -196,7 +196,9 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { return pVgEpNew; } -void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); } +void tDeleteSMqVgEp(SMqVgEp *pVgEp) { + if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg); +} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 01149f793f3d8666c4409d46f064fdeac620f45f..b62de0e06e719c766d9e88cdb60639da3b15dcfc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -298,6 +298,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } @@ -307,16 +309,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } } else { @@ -331,6 +339,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFreeClear(topicObj.ast); + taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.physicalPlan); return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f607e0367e863b7e6a7532eb09cc9c1846ac8fda..6ca60945cdd05aba42830fa5b3bbd77c04aa4680 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -233,16 +233,18 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; - void* data = taosMemoryMalloc(msgLen); - if (data == NULL) { + // make sure msgType == TDMT_VND_SUBMIT + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { return -1; } - memcpy(data, msg, msgLen); - // make sure msgType == TDMT_VND_SUBMIT - if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { + if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; + + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { return -1; } + memcpy(data, msg, msgLen); SRpcMsg req = { .msgType = TDMT_VND_STREAM_TRIGGER,