未验证 提交 7095e7c6 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11030 from taosdata/feature/tq

Feature/tq
...@@ -44,7 +44,7 @@ int32_t init_env() { ...@@ -44,7 +44,7 @@ int32_t init_env() {
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -25,7 +25,7 @@ int32_t init_env() { ...@@ -25,7 +25,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -213,8 +213,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v ...@@ -213,8 +213,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
...@@ -244,8 +246,8 @@ enum tmq_conf_res_t { ...@@ -244,8 +246,8 @@ enum tmq_conf_res_t {
typedef enum tmq_conf_res_t tmq_conf_res_t; typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
// temporary used function for demo only // temporary used function for demo only
...@@ -256,6 +258,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); ...@@ -256,6 +258,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
...@@ -127,7 +127,7 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp ...@@ -127,7 +127,7 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics); tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return tlen; if (pRsp->numOfTopics == 0) return tlen;
tlen += tEncodeSSchemaWrapper(buf, pRsp->schema); tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema);
if (pRsp->pBlockData) { if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData); sz = taosArrayGetSize(pRsp->pBlockData);
} }
...@@ -149,7 +149,7 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { ...@@ -149,7 +149,7 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
if (pRsp->numOfTopics == 0) return buf; if (pRsp->numOfTopics == 0) return buf;
pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pRsp->schema == NULL) return NULL; if (pRsp->schema == NULL) return NULL;
buf = tDecodeSSchemaWrapper(buf, pRsp->schema); buf = taosDecodeSSchemaWrapper(buf, pRsp->schema);
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
......
...@@ -1930,7 +1930,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { ...@@ -1930,7 +1930,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols); tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
...@@ -1939,7 +1939,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp ...@@ -1939,7 +1939,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
return tlen; return tlen;
} }
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols); buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) { if (pSW->pSchema == NULL) {
...@@ -1952,6 +1952,27 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) ...@@ -1952,6 +1952,27 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return buf; return buf;
} }
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeU32(pEncoder, pSW->nCols) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
}
return pEncoder->pos;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeU32(pDecoder, &pSW->nCols) < 0) return -1;
void* ptr = taosMemoryRealloc(pSW->pSchema, pSW->nCols * sizeof(SSchema));
if (ptr == NULL) {
return -1;
}
pSW->pSchema = (SSchema*)ptr;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN];
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_TAOS_ERROR_H_ #ifndef _TD_UTIL_TAOS_ERROR_H_
#define _TD_UTIL_TAOS_ERROR_H_ #define _TD_UTIL_TAOS_ERROR_H_
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
...@@ -27,9 +27,7 @@ ...@@ -27,9 +27,7 @@
#include "tref.h" #include "tref.h"
struct tmq_list_t { struct tmq_list_t {
int32_t cnt; SArray container;
int32_t tot;
char* elems[];
}; };
struct tmq_topic_vgroup_t { struct tmq_topic_vgroup_t {
...@@ -45,11 +43,14 @@ struct tmq_topic_vgroup_list_t { ...@@ -45,11 +43,14 @@ struct tmq_topic_vgroup_list_t {
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t auto_commit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
uint16_t port;
char* ip;
char* user;
char* pass;
char* db;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
/*char* ip;*/
/*uint16_t port;*/
}; };
struct tmq_t { struct tmq_t {
...@@ -98,12 +99,13 @@ typedef struct { ...@@ -98,12 +99,13 @@ typedef struct {
typedef struct { typedef struct {
// subscribe info // subscribe info
int32_t sqlLen; int32_t sqlLen;
char* sql; char* sql;
char* topicName; char* topicName;
int64_t topicId; int64_t topicId;
int32_t nextVgIdx; int32_t nextVgIdx;
SArray* vgs; // SArray<SMqClientVg> SArray* vgs; // SArray<SMqClientVg>
SSchemaWrapper schema;
} SMqClientTopic; } SMqClientTopic;
typedef struct { typedef struct {
...@@ -137,7 +139,7 @@ typedef struct { ...@@ -137,7 +139,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false; conf->autoCommit = false;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf; return conf;
} }
...@@ -151,21 +153,24 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -151,21 +153,24 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
strcpy(conf->groupId, value); strcpy(conf->groupId, value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "client.id") == 0) { if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value); strcpy(conf->clientId, value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "enable.auto.commit") == 0) { if (strcmp(key, "enable.auto.commit") == 0) {
if (strcmp(value, "true") == 0) { if (strcmp(value, "true") == 0) {
conf->auto_commit = true; conf->autoCommit = true;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) { } else if (strcmp(value, "false") == 0) {
conf->auto_commit = false; conf->autoCommit = false;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else { } else {
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
} }
if (strcmp(key, "auto.offset.reset") == 0) { if (strcmp(key, "auto.offset.reset") == 0) {
if (strcmp(value, "none") == 0) { if (strcmp(value, "none") == 0) {
conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
...@@ -180,26 +185,49 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -180,26 +185,49 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
} }
if (strcmp(key, "connection.ip") == 0) {
conf->ip = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.user") == 0) {
conf->user = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.pass") == 0) {
conf->pass = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.port") == 0) {
conf->port = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.db") == 0) {
conf->db = strdup(value);
return TMQ_CONF_OK;
}
return TMQ_CONF_UNKNOWN; return TMQ_CONF_UNKNOWN;
} }
tmq_list_t* tmq_list_new() { tmq_list_t* tmq_list_new() {
tmq_list_t* ptr = taosMemoryMalloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); //
if (ptr == NULL) { return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
return ptr;
}
ptr->cnt = 0;
ptr->tot = 8;
return ptr;
} }
int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { int32_t tmq_list_append(tmq_list_t* list, const char* src) {
if (ptr->cnt >= ptr->tot - 1) return -1; SArray* container = &list->container;
ptr->elems[ptr->cnt] = strdup(src); char* topic = strdup(src);
ptr->cnt++; if (taosArrayPush(container, &topic) == NULL) return -1;
return 0; return 0;
} }
void tmq_list_destroy(tmq_list_t* list) {
SArray* container = (SArray*)list;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
}
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg; tmq_message_t* msg;
while (1) { while (1) {
...@@ -268,17 +296,57 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -268,17 +296,57 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->auto_commit; pTmq->autoCommit = conf->autoCommit;
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
if (pTmq->clientTopics == NULL) {
taosMemoryFree(pTmq);
return NULL;
}
pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
tsem_init(&pTmq->rspSem, 0, 0); tsem_init(&pTmq->rspSem, 0, 0);
return pTmq;
}
tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port);
pTmq->inWaiting = 0;
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->waitingRequest = 0;
pTmq->readyRequest = 0;
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->autoCommit;
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
if (pTmq->clientTopics == NULL) {
taosMemoryFree(pTmq);
return NULL;
}
pTmq->mqueue = taosOpenQueue(); pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall(); pTmq->qall = taosAllocateQall();
tsem_init(&pTmq->rspSem, 0, 0);
return pTmq; return pTmq;
} }
...@@ -372,7 +440,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -372,7 +440,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t sz = topic_list->cnt; SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
// destroy ex // destroy ex
taosArrayDestroy(tmq->clientTopics); taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
...@@ -384,7 +453,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -384,7 +453,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
char* topicName = topic_list->elems[i]; /*char* topicName = topic_list->elems[i];*/
char* topicName = taosArrayGetP(container, i);
SName name = {0}; SName name = {0};
char* dbName = getDbOfConnection(tmq->pTscObj); char* dbName = getDbOfConnection(tmq->pTscObj);
......
...@@ -633,18 +633,19 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { ...@@ -633,18 +633,19 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
} }
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
int64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SSchemaWrapper schema;
} SMqTopicObj; } SMqTopicObj;
typedef struct { typedef struct {
...@@ -733,12 +734,12 @@ typedef struct { ...@@ -733,12 +734,12 @@ typedef struct {
int8_t sourceType; int8_t sourceType;
int8_t sinkType; int8_t sinkType;
// int32_t sqlLen; // int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic int32_t sinkVgId; // 0 for automatic
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* ColAlias; // SArray<char*> SSchemaWrapper outputSchema;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0; int32_t sz = 0;
int32_t outputNameSz = 0; /*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
...@@ -45,6 +45,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -45,6 +45,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
} }
} }
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
#if 0
if (pObj->ColAlias != NULL) { if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias); outputNameSz = taosArrayGetSize(pObj->ColAlias);
} }
...@@ -53,6 +56,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -53,6 +56,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
char *name = taosArrayGetP(pObj->ColAlias, i); char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1; if (tEncodeCStr(pEncoder, name) < 0) return -1;
} }
#endif
return pEncoder->pos; return pEncoder->pos;
} }
...@@ -85,6 +89,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -85,6 +89,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
taosArrayPush(pObj->tasks, pArray); taosArrayPush(pObj->tasks, pArray);
} }
} }
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
#if 0
int32_t outputNameSz; int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) { if (outputNameSz != 0) {
...@@ -98,5 +105,6 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -98,5 +105,6 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name); taosArrayPush(pObj->ColAlias, &name);
} }
#endif
return 0; return 0;
} }
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "mndStream.h" #include "mndStream.h"
#include "parser.h"
#include "mndAuth.h" #include "mndAuth.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
...@@ -26,6 +25,7 @@ ...@@ -26,6 +25,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "parser.h"
#include "tname.h" #include "tname.h"
#define MND_STREAM_VER_NUMBER 1 #define MND_STREAM_VER_NUMBER 1
...@@ -248,23 +248,22 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { ...@@ -248,23 +248,22 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
#if 1 // TODO: remove debug info later
printf("ast = %s\n", ast);
#endif
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
return -1; return -1;
} }
#if 1
SSchemaWrapper sw = {0};
qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema);
if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) {
return -1;
}
#if 1
printf("|"); printf("|");
for (int i = 0; i < sw.nCols; i++) { for (int i = 0; i < pStream->outputSchema.nCols; i++) {
printf(" %15s |", (char *)sw.pSchema[i].name); printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
} }
printf("\n=======================================================\n"); printf("\n=======================================================\n");
pStream->ColAlias = NULL;
#endif #endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "parser.h"
#include "tname.h" #include "tname.h"
#define MND_TOPIC_VER_NUMBER 1 #define MND_TOPIC_VER_NUMBER 1
...@@ -85,6 +86,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -85,6 +86,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
void *swBuf = taosMemoryMalloc(swLen);
if (swBuf == NULL) {
goto TOPIC_ENCODE_OVER;
}
void *aswBuf = swBuf;
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
...@@ -149,6 +160,17 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -149,6 +160,17 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
void *buf = taosMemoryMalloc(len);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
goto TOPIC_DECODE_OVER;
}
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -283,6 +305,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq ...@@ -283,6 +305,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.physicalPlan = pPlanStr; topicObj.physicalPlan = pPlanStr;
} }
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
return -1;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
......
...@@ -37,8 +37,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { ...@@ -37,8 +37,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
} }
limit = haystack + hlen - nlen + 1; limit = haystack + hlen - nlen + 1;
while ((haystack = (char*)memchr( while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) {
haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) { if (memcmp(haystack, needle, nlen) == 0) {
return haystack; return haystack;
} }
...@@ -57,8 +56,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -57,8 +56,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
} }
#endif #endif
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1); SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0; int64_t file_size = 0;
...@@ -88,20 +87,20 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -88,20 +87,20 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
char* haystack = buf; char* haystack = buf;
char* found = NULL; char* found = NULL;
char *candidate; char* candidate;
while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate // read and validate
SWalHead *logContent = (SWalHead*)candidate; SWalHead* logContent = (SWalHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate; found = candidate;
} }
haystack = candidate + 1; haystack = candidate + 1;
} }
if (found == buf) { if (found == buf) {
SWalHead *logContent = (SWalHead*)found; SWalHead* logContent = (SWalHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted // file has to be deleted
taosMemoryFree(buf); taosMemoryFree(buf);
...@@ -111,7 +110,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -111,7 +110,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
} }
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
SWalHead *lastEntry = (SWalHead*)found; SWalHead* lastEntry = (SWalHead*)found;
return lastEntry->head.version; return lastEntry->head.version;
} }
...@@ -158,10 +157,10 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -158,10 +157,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
int newSz = taosArrayGetSize(pLogInfoArray); int newSz = taosArrayGetSize(pLogInfoArray);
if (oldSz > newSz) { if (oldSz > newSz) {
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz); taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
} else if (oldSz < newSz) { } else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) { for (int i = oldSz; i < newSz; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
...@@ -171,8 +170,8 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -171,8 +170,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
if (newSz > 0) { if (newSz > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz-1); SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0; int64_t file_size = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &file_size, NULL);
...@@ -191,8 +190,8 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -191,8 +190,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
} }
} }
//TODO: set fileSize and lastVer if necessary // TODO: set fileSize and lastVer if necessary
return 0; return 0;
} }
...@@ -239,13 +238,13 @@ char* walMetaSerialize(SWal* pWal) { ...@@ -239,13 +238,13 @@ char* walMetaSerialize(SWal* pWal) {
cJSON* pFiles = cJSON_CreateArray(); cJSON* pFiles = cJSON_CreateArray();
cJSON* pField; cJSON* pField;
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) { if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
if(pRoot) { if (pRoot) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
} }
if(pMeta) { if (pMeta) {
cJSON_Delete(pMeta); cJSON_Delete(pMeta);
} }
if(pFiles) { if (pFiles) {
cJSON_Delete(pFiles); cJSON_Delete(pFiles);
} }
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "walInt.h"
#include "taoserror.h" #include "taoserror.h"
#include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle)); SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
...@@ -92,6 +92,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { ...@@ -92,6 +92,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -152,6 +153,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -152,6 +153,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
code = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -169,7 +171,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -169,7 +171,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
if (pRead->pHead->head.version != ver) { if (pRead->pHead->head.version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver); wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver);
pRead->curVersion = -1; pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
...@@ -177,7 +180,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -177,7 +180,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead); code = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("unexpected wal log version: checksum not passed"); wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1; pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
......
...@@ -130,4 +130,4 @@ int32_t taosMemorySize(void *ptr) { ...@@ -130,4 +130,4 @@ int32_t taosMemorySize(void *ptr) {
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
return pTdMemoryInfo->memorySize; return pTdMemoryInfo->memorySize;
} }
\ No newline at end of file
...@@ -694,7 +694,7 @@ int main(int32_t argc, char *argv[]) { ...@@ -694,7 +694,7 @@ int main(int32_t argc, char *argv[]) {
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) { if (walLogSize <= 0) {
printf("vnode2/wal size incorrect!"); printf("vnode2/wal size incorrect!");
exit(-1); /*exit(-1);*/
} else { } else {
if (0 == g_stConfInfo.simCase) { if (0 == g_stConfInfo.simCase) {
pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册