未验证 提交 951d102a 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12313 from taosdata/feature/tq

feat(tmq): change default config
......@@ -1455,7 +1455,7 @@ typedef struct {
static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo));
if (pRebInfo == NULL) {
goto _err;
return NULL;
}
strcpy(pRebInfo->key, key);
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
......
......@@ -16,8 +16,8 @@
#ifndef _TD_COMMON_NAME_H_
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tarray.h"
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
......@@ -65,19 +65,19 @@ bool tNameDBNameEqual(SName* left, SName* right);
typedef struct {
// input
SArray *tags; // element is SSmlKV
const char *sTableName; // super table name
uint8_t sTableNameLen; // the length of super table name
SArray* tags; // element is SSmlKv
const char* sTableName; // super table name
uint8_t sTableNameLen; // the length of super table name
// output
char *childTableName; // must have size of TSDB_TABLE_NAME_LEN;
uint64_t uid; // child table uid, may be useful
char* childTableName; // must have size of TSDB_TABLE_NAME_LEN;
uint64_t uid; // child table uid, may be useful
} RandTableName;
void buildChildTableName(RandTableName *rName);
void buildChildTableName(RandTableName* rName);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_NAME_H_*/
#endif /*_TD_COMMON_NAME_H_*/
......@@ -16,6 +16,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "trpc.h"
#ifdef __cplusplus
......@@ -154,6 +155,10 @@ struct SStreamTask {
STaskDispatcherShuffle shuffleDispatcher;
};
// msg buffer
int32_t memUsed;
STaosQueue* inputQ;
// application storage
void* ahandle;
};
......@@ -194,6 +199,8 @@ typedef struct {
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType);
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
#ifdef __cplusplus
......
......@@ -187,7 +187,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->autoCommit = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf;
......
......@@ -18,11 +18,9 @@
#include "tcommon.h"
#include "tstrbuild.h"
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; }
#if 0
// TODO refactor
......@@ -95,12 +93,12 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
start += timezone * t;
......@@ -142,10 +140,10 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
int32_t tNameLen(const SName* name) {
assert(name != NULL);
char tmp[12] = {0};
char tmp[12] = {0};
int32_t len = sprintf(tmp, "%d", name->acctId);
int32_t len1 = (int32_t) strlen(name->dbname);
int32_t len2 = (int32_t) strlen(name->tname);
int32_t len1 = (int32_t)strlen(name->dbname);
int32_t len2 = (int32_t)strlen(name->tname);
if (name->type == TSDB_DB_NAME_T) {
assert(len2 == 0);
......@@ -200,9 +198,7 @@ const char* tNameGetTableName(const SName* name) {
return &name->tname[0];
}
void tNameAssign(SName* dst, const SName* src) {
memcpy(dst, src, sizeof(SName));
}
void tNameAssign(SName* dst, const SName* src) { memcpy(dst, src, sizeof(SName)); }
int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t nameLen) {
assert(dst != NULL && dbName != NULL && nameLen > 0);
......@@ -244,7 +240,6 @@ bool tNameDBNameEqual(SName* left, SName* right) {
return (0 == strcmp(left->dbname, right->dbname));
}
int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
assert(dst != NULL && str != NULL && strlen(str) > 0);
......@@ -260,14 +255,14 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
if ((type & T_NAME_DB) == T_NAME_DB) {
dst->type = TSDB_DB_NAME_T;
char* start = (char*)((p == NULL)? str:(p+1));
char* start = (char*)((p == NULL) ? str : (p + 1));
int32_t len = 0;
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t) strlen(start);
len = (int32_t)strlen(start);
} else {
len = (int32_t) (p - start);
len = (int32_t)(p - start);
}
// too long account id or too long db name
......@@ -275,21 +270,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return -1;
}
memcpy (dst->dbname, start, len);
memcpy(dst->dbname, start, len);
dst->dbname[len] = 0;
}
if ((type & T_NAME_TABLE) == T_NAME_TABLE) {
dst->type = TSDB_TABLE_NAME_T;
char* start = (char*) ((p == NULL)? str: (p+1));
char* start = (char*)((p == NULL) ? str : (p + 1));
// too long account id or too long db name
int32_t len = (int32_t) strlen(start);
int32_t len = (int32_t)strlen(start);
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
}
memcpy (dst->tname, start, len);
memcpy(dst->tname, start, len);
dst->tname[len] = 0;
}
......@@ -305,14 +300,14 @@ static int compareKv(const void* p1, const void* p2) {
if (res != 0) {
return res;
} else {
return kvLen1-kvLen2;
return kvLen1 - kvLen2;
}
}
/*
* use stable name and tags to grearate child table name
*/
void buildChildTableName(RandTableName *rName) {
void buildChildTableName(RandTableName* rName) {
int32_t size = taosArrayGetSize(rName->tags);
ASSERT(size > 0);
taosArraySort(rName->tags, compareKv);
......@@ -320,19 +315,19 @@ void buildChildTableName(RandTableName *rName) {
SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
for (int j = 0; j < size; ++j) {
SSmlKv *tagKv = taosArrayGetP(rName->tags, j);
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
tMD5Update(&context, (uint8_t*)keyJoined, (uint32_t)len);
tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016" PRIx64 "%016" PRIx64, digest1, digest2);
taosStringBuilderDestroy(&sb);
rName->uid = digest1;
}
......@@ -196,7 +196,9 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
return pVgEpNew;
}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
}
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0;
......
......@@ -298,6 +298,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
......@@ -307,16 +309,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
} else {
......@@ -331,6 +339,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFreeClear(topicObj.ast);
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.physicalPlan);
return -1;
}
......
......@@ -233,16 +233,18 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
// make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
return -1;
}
memcpy(data, msg, msgLen);
// make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
return -1;
}
memcpy(data, msg, msgLen);
SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册