提交 fdc855af 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode

...@@ -192,9 +192,38 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); ...@@ -192,9 +192,38 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg);
DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg);
DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg);
DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter);
DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1519,7 +1519,8 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { ...@@ -1519,7 +1519,8 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
typedef struct SMqSetCVgReq { typedef struct SMqSetCVgReq {
int32_t vgId; int32_t vgId;
int64_t consumerId; int64_t oldConsumerId;
int64_t newConsumerId;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql; char* sql;
...@@ -1550,7 +1551,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { ...@@ -1550,7 +1551,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
...@@ -1562,7 +1564,8 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* ...@@ -1562,7 +1564,8 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->topicName);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
...@@ -1579,15 +1582,6 @@ typedef struct SMqSetCVgRsp { ...@@ -1579,15 +1582,6 @@ typedef struct SMqSetCVgRsp {
char cGroup[TSDB_CONSUMER_GROUP_LEN]; char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp; } SMqSetCVgRsp;
typedef struct SMqConsumeReq {
int64_t reqId;
int64_t offset;
int64_t consumerId;
int64_t blockingTime;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqConsumeReq;
typedef struct SMqColData { typedef struct SMqColData {
int16_t colId; int16_t colId;
int16_t type; int16_t type;
...@@ -1615,12 +1609,29 @@ typedef struct SMqTopicBlk { ...@@ -1615,12 +1609,29 @@ typedef struct SMqTopicBlk {
typedef struct SMqConsumeRsp { typedef struct SMqConsumeRsp {
int64_t reqId; int64_t reqId;
int64_t clientId; int64_t consumerId;
int32_t bodyLen; int32_t bodyLen;
int32_t numOfTopics; int32_t numOfTopics;
SMqTopicData data[]; SMqTopicData data[];
} SMqConsumeRsp; } SMqConsumeRsp;
// one req for one vg+topic
typedef struct SMqConsumeReq {
//0: commit only, current offset
//1: consume only, poll next offset
//2: commit current and consume next offset
int32_t reqType;
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); ...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes); int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
...@@ -80,7 +80,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru ...@@ -80,7 +80,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob); int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob);
/** /**
* Fetch query result from the remote query executor * Fetch query result from the remote query executor
...@@ -88,7 +88,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, ...@@ -88,7 +88,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
* @param data * @param data
* @return * @return
*/ */
int32_t scheduleFetchRows(struct SSchJob *pJob, void **data); int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
/** /**
...@@ -102,7 +102,7 @@ int32_t scheduleFetchRows(struct SSchJob *pJob, void **data); ...@@ -102,7 +102,7 @@ int32_t scheduleFetchRows(struct SSchJob *pJob, void **data);
* Free the query job * Free the query job
* @param pJob * @param pJob
*/ */
void scheduleFreeJob(void *pJob); void schedulerFreeJob(void *pJob);
void schedulerDestroy(void); void schedulerDestroy(void);
......
...@@ -354,7 +354,7 @@ int32_t* taosGetErrno(); ...@@ -354,7 +354,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist") #define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist") #define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist") #define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist") #define TSDB_CODE_QRY_TASK_CTX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task context not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled") #define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped") #define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling") #define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
......
...@@ -67,7 +67,9 @@ static void deregisterRequest(SRequestObj* pRequest) { ...@@ -67,7 +67,9 @@ static void deregisterRequest(SRequestObj* pRequest) {
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); int64_t duration = taosGetTimestampMs() - pRequest->metric.start;
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", reqId:0x%"PRIx64" elapsed:%"PRIu64" ms, current:%d, app current:%d", pRequest->self, pTscObj->id,
pRequest->requestId, duration, num, currentInst);
taosReleaseRef(clientConnRefPool, pTscObj->id); taosReleaseRef(clientConnRefPool, pTscObj->id);
} }
......
...@@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
/*p->pAppHbMgr = appHbMgrInit(p);*/ p->pAppHbMgr = appHbMgrInit(p);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;
...@@ -237,12 +237,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -237,12 +237,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
} else { } else {
if (pRequest->body.pQueryJob != NULL) { if (pRequest->body.pQueryJob != NULL) {
scheduleFreeJob(pRequest->body.pQueryJob); schedulerFreeJob(pRequest->body.pQueryJob);
} }
} }
...@@ -251,17 +251,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -251,17 +251,11 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return pRequest->code; return pRequest->code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob); return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
} }
typedef struct tmq_t tmq_t;
typedef struct SMqClientTopic { typedef struct SMqClientVg {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
// statistics // statistics
int64_t consumeCnt; int64_t consumeCnt;
// offset // offset
...@@ -270,35 +264,159 @@ typedef struct SMqClientTopic { ...@@ -270,35 +264,159 @@ typedef struct SMqClientTopic {
//connection info //connection info
int32_t vgId; int32_t vgId;
SEpSet epSet; SEpSet epSet;
} SMqClientVg;
typedef struct SMqClientTopic {
// subscribe info
int32_t sqlLen;
char* sql;
char* topicName;
int64_t topicId;
int32_t nextVgIdx;
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic; } SMqClientTopic;
typedef struct tmq_resp_err_t { typedef struct tmq_resp_err_t {
int32_t code; int32_t code;
} tmq_resp_err_t; } tmq_resp_err_t;
typedef struct tmq_topic_vgroup_list_t { typedef struct tmq_topic_vgroup_t {
char* topicName; char* topic;
int32_t vgId; int32_t vgId;
int64_t committedOffset; int64_t commitOffset;
} tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
} tmq_topic_vgroup_list_t; } tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
typedef struct tmq_conf_t{ struct tmq_conf_t {
char* clientId; char clientId[256];
char* groupId; char groupId[256];
char* ip; char* ip;
uint16_t port; uint16_t port;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
} tmq_conf_t; };
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf;
}
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id")) {
strcpy(conf->groupId, value);
}
if (strcmp(key, "client.id")) {
strcpy(conf->clientId, value);
}
return 0;
}
struct tmq_t { struct tmq_t {
char groupId[256]; char groupId[256];
char clientId[256]; char clientId[256];
int64_t consumerId;
int64_t status;
STscObj* pTscObj; STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
SArray* clientTopics; // SArray<SMqClientTopic> int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
};
tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
if (pTmq == NULL) {
return NULL;
}
pTmq->pTscObj = (STscObj*)conn;
pTmq->status = 0;
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->commit_cb = conf->commit_cb;
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
return pTmq;
}
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
}; };
tmq_list_t* tmq_list_new() {
tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
if (ptr == NULL) {
return ptr;
}
ptr->cnt = 0;
ptr->tot = 8;
return ptr;
}
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
if (ptr->cnt >= ptr->tot-1) return -1;
ptr->elems[ptr->cnt] = src;
ptr->cnt++;
return 0;
}
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL;
tmq->status = 1;
int32_t sz = topic_list->cnt;
tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) {
char* topicName = strdup(topic_list->elems[i]);
taosArrayPush(tmq->clientTopics, &topicName);
}
SCMSubscribeReq req;
req.topicNum = taosArrayGetSize(tmq->clientTopics);
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = tmq->clientTopics;
int tlen = tSerializeSCMSubscribeReq(NULL, &req);
void* buf = malloc(tlen);
if(buf == NULL) {
goto _return;
}
void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
if (pRequest == NULL) {
tscError("failed to malloc sqlObj");
}
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
_return:
if (body != NULL) {
destroySendMsgInfo(body);
}
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commit_cb = cb; conf->commit_cb = cb;
...@@ -327,10 +445,10 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { ...@@ -327,10 +445,10 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
int sz = taosArrayGetSize(clientTopics); int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
if (pCTopic->vgId == -1) { /*if (pCTopic->vgId == -1) {*/
pMqHb->status = 1; /*pMqHb->status = 1;*/
break; /*break;*/
} /*}*/
} }
kv.value = pMqHb; kv.value = pMqHb;
kv.valueLen = sizeof(SMqHbMsg); kv.valueLen = sizeof(SMqHbMsg);
...@@ -451,22 +569,63 @@ _return: ...@@ -451,22 +569,63 @@ _return:
return pRequest; return pRequest;
} }
typedef struct tmq_message_t { /*typedef SMqConsumeRsp tmq_message_t;*/
int32_t numOfRows;
char* topicName;
TAOS_ROW row[];
} tmq_message_t;
tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) { struct tmq_message_t {
return NULL; SMqConsumeRsp rsp;
};
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
return NULL;
}
SRequestObj *pRequest = NULL;
SMqConsumeReq req = {0};
req.reqType = 1;
req.blockingTime = blocking_time;
req.consumerId = tmq->consumerId;
strcpy(req.cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(req.topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx;
pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
req.offset = pVg->currentOffset;
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
pRequest->type = TDMT_VND_CONSUME;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
return (tmq_message_t*)pRequest->body.resInfo.pData;
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
} }
tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) { tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
SMqConsumeReq req = {0};
return NULL; return NULL;
} }
void tmq_message_destroy(tmq_message_t* mq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
} }
...@@ -711,7 +870,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -711,7 +870,7 @@ void* doFetchRow(SRequestObj* pRequest) {
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData); int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
return NULL; return NULL;
......
...@@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......
...@@ -526,29 +526,54 @@ TEST(testCase, show_table_Test) { ...@@ -526,29 +526,54 @@ TEST(testCase, show_table_Test) {
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
// //
//TEST(testCase, create_topic_Test) { TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
//
// char* sql = "select * from tu"; char* sql = "select * from tu";
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
TEST(testCase, tmq_subscribe_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
printf("get msg\n");
if (msg == NULL) break;
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -681,4 +706,4 @@ TEST(testCase, projection_query_tables) { ...@@ -681,4 +706,4 @@ TEST(testCase, projection_query_tables) {
// taos_close(pConn); // taos_close(pConn);
//} //}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
// Requests handled by VNODE // Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
......
...@@ -892,7 +892,7 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -892,7 +892,7 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false); int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
return code; return code;
} }
......
...@@ -19,14 +19,14 @@ ...@@ -19,14 +19,14 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "scheduler.h"
#include "sync.h" #include "sync.h"
#include "tmsg.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "scheduler.h"
#include "mnode.h" #include "mnode.h"
...@@ -37,12 +37,42 @@ extern "C" { ...@@ -37,12 +37,42 @@ extern "C" {
extern int32_t mDebugFlag; extern int32_t mDebugFlag;
// mnode log function // mnode log function
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }} #define mFatal(...) \
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }} { \
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }} if (mDebugFlag & DEBUG_FATAL) { \
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }} taosPrintLog("MND FATAL ", 255, __VA_ARGS__); \
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} } \
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} }
#define mError(...) \
{ \
if (mDebugFlag & DEBUG_ERROR) { \
taosPrintLog("MND ERROR ", 255, __VA_ARGS__); \
} \
}
#define mWarn(...) \
{ \
if (mDebugFlag & DEBUG_WARN) { \
taosPrintLog("MND WARN ", 255, __VA_ARGS__); \
} \
}
#define mInfo(...) \
{ \
if (mDebugFlag & DEBUG_INFO) { \
taosPrintLog("MND ", 255, __VA_ARGS__); \
} \
}
#define mDebug(...) \
{ \
if (mDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
} \
}
#define mTrace(...) \
{ \
if (mDebugFlag & DEBUG_TRACE) { \
taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
} \
}
typedef enum { typedef enum {
MND_AUTH_ACCT_START = 0, MND_AUTH_ACCT_START = 0,
...@@ -96,13 +126,13 @@ typedef struct { ...@@ -96,13 +126,13 @@ typedef struct {
ETrnPolicy policy; ETrnPolicy policy;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void *rpcHandle; void* rpcHandle;
void *rpcAHandle; void* rpcAHandle;
SArray *redoLogs; SArray* redoLogs;
SArray *undoLogs; SArray* undoLogs;
SArray *commitLogs; SArray* commitLogs;
SArray *redoActions; SArray* redoActions;
SArray *undoActions; SArray* undoActions;
} STrans; } STrans;
typedef struct { typedef struct {
...@@ -135,28 +165,28 @@ typedef struct { ...@@ -135,28 +165,28 @@ typedef struct {
ESyncState role; ESyncState role;
int32_t roleTerm; int32_t roleTerm;
int64_t roleTime; int64_t roleTime;
SDnodeObj *pDnode; SDnodeObj* pDnode;
} SMnodeObj; } SMnodeObj;
typedef struct { typedef struct {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
SDnodeObj *pDnode; SDnodeObj* pDnode;
} SQnodeObj; } SQnodeObj;
typedef struct { typedef struct {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
SDnodeObj *pDnode; SDnodeObj* pDnode;
} SSnodeObj; } SSnodeObj;
typedef struct { typedef struct {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
SDnodeObj *pDnode; SDnodeObj* pDnode;
} SBnodeObj; } SBnodeObj;
typedef struct { typedef struct {
...@@ -201,7 +231,7 @@ typedef struct { ...@@ -201,7 +231,7 @@ typedef struct {
int64_t updateTime; int64_t updateTime;
int8_t superUser; int8_t superUser;
int32_t acctId; int32_t acctId;
SHashObj *prohibitDbHash; SHashObj* prohibitDbHash;
} SUserObj; } SUserObj;
typedef struct { typedef struct {
...@@ -226,15 +256,15 @@ typedef struct { ...@@ -226,15 +256,15 @@ typedef struct {
} SDbCfg; } SDbCfg;
typedef struct { typedef struct {
char name[TSDB_DB_FNAME_LEN]; char name[TSDB_DB_FNAME_LEN];
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; uint64_t uid;
int32_t cfgVersion; int32_t cfgVersion;
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; // default is 1 int8_t hashMethod; // default is 1
SDbCfg cfg; SDbCfg cfg;
} SDbObj; } SDbObj;
typedef struct { typedef struct {
...@@ -272,7 +302,7 @@ typedef struct { ...@@ -272,7 +302,7 @@ typedef struct {
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
SRWLatch lock; SRWLatch lock;
SSchema *pSchema; SSchema* pSchema;
} SStbObj; } SStbObj;
typedef struct { typedef struct {
...@@ -287,8 +317,8 @@ typedef struct { ...@@ -287,8 +317,8 @@ typedef struct {
int64_t sigature; int64_t sigature;
int32_t commentSize; int32_t commentSize;
int32_t codeSize; int32_t codeSize;
char *pComment; char* pComment;
char *pCode; char* pCode;
char pData[]; char pData[];
} SFuncObj; } SFuncObj;
...@@ -301,8 +331,8 @@ typedef struct { ...@@ -301,8 +331,8 @@ typedef struct {
int32_t numOfRows; int32_t numOfRows;
int32_t numOfReads; int32_t numOfReads;
int32_t payloadLen; int32_t payloadLen;
void *pIter; void* pIter;
SMnode *pMnode; SMnode* pMnode;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
...@@ -327,9 +357,10 @@ typedef struct SMqTopicConsumer { ...@@ -327,9 +357,10 @@ typedef struct SMqTopicConsumer {
#endif #endif
typedef struct SMqConsumerEp { typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned int32_t vgId; // -1 for unassigned
int32_t status;
SEpSet epSet; SEpSet epSet;
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
int32_t execLen; int32_t execLen;
...@@ -339,6 +370,7 @@ typedef struct SMqConsumerEp { ...@@ -339,6 +370,7 @@ typedef struct SMqConsumerEp {
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
...@@ -347,6 +379,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon ...@@ -347,6 +379,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
...@@ -354,16 +387,17 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu ...@@ -354,16 +387,17 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
return buf; return buf;
} }
//unit for rebalance // unit for rebalance
typedef struct SMqSubscribeObj { typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch; int32_t epoch;
//TODO: replace with priority queue // TODO: replace with priority queue
int32_t nextConsumerIdx; int32_t nextConsumerIdx;
SArray* availConsumer; // SArray<int64_t> (consumerId) SArray* availConsumer; // SArray<int64_t> (consumerId)
SArray* assigned; // SArray<SMqConsumerEp> SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer; // SArray<SMqConsumerEp> SArray* idleConsumer; // SArray<SMqConsumerEp>
SArray* unassignedVg; // SArray<SMqConsumerEp> SArray* lostConsumer; // SArray<SMqConsumerEp>
SArray* unassignedVg; // SArray<SMqConsumerEp>
} SMqSubscribeObj; } SMqSubscribeObj;
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
...@@ -384,17 +418,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { ...@@ -384,17 +418,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->unassignedConsumer); taosArrayDestroy(pSub->idleConsumer);
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->unassignedConsumer); taosArrayDestroy(pSub->idleConsumer);
taosArrayDestroy(pSub->unassignedVg); taosArrayDestroy(pSub->unassignedVg);
free(pSub); free(pSub);
return NULL; return NULL;
...@@ -422,10 +456,10 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb ...@@ -422,10 +456,10 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
tlen += tEncodeSMqConsumerEp(buf, pCEp); tlen += tEncodeSMqConsumerEp(buf, pCEp);
} }
sz = taosArrayGetSize(pSub->unassignedConsumer); sz = taosArrayGetSize(pSub->idleConsumer);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i); SMqConsumerEp* pCEp = taosArrayGet(pSub->idleConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp); tlen += tEncodeSMqConsumerEp(buf, pCEp);
} }
...@@ -457,22 +491,22 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -457,22 +491,22 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedConsumer == NULL) { if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp; SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedConsumer, &cEp); taosArrayPush(pSub->idleConsumer, &cEp);
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->unassignedConsumer); taosArrayDestroy(pSub->idleConsumer);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -487,38 +521,37 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -487,38 +521,37 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
typedef struct SMqCGroup { typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN]; char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
SList *consumerIds; // SList<int64_t> SList* consumerIds; // SList<int64_t>
SList *idleVGroups; // SList<int32_t> SList* idleVGroups; // SList<int32_t>
} SMqCGroup; } SMqCGroup;
typedef struct SMqTopicObj { typedef struct SMqTopicObj {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; uint64_t uid;
uint64_t dbUid; uint64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
char *sql; char* sql;
char *logicalPlan; char* logicalPlan;
char *physicalPlan; char* physicalPlan;
//SHashObj *cgroups; // SHashObj<SMqCGroup> // SHashObj *cgroups; // SHashObj<SMqCGroup>
//SHashObj *consumers; // SHashObj<SMqConsumerObj> // SHashObj *consumers; // SHashObj<SMqConsumerObj>
} SMqTopicObj; } SMqTopicObj;
// TODO: add cache and change name to id // TODO: add cache and change name to id
typedef struct SMqConsumerTopic { typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch; int32_t epoch;
//TODO: replace with something with ep // vg assigned to the consumer on the topic
//SList *vgroups; // SList<int32_t> SArray* pVgInfo; // SArray<int32_t>
//vg assigned to the consumer on the topic
SArray *pVgInfo; // SArray<int32_t>
} SMqConsumerTopic; } SMqConsumerTopic;
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
SMqSubscribeObj* pSub) {
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic)); SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
if (pCTopic == NULL) { if (pCTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -567,10 +600,11 @@ static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* p ...@@ -567,10 +600,11 @@ static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* p
typedef struct SMqConsumerObj { typedef struct SMqConsumerObj {
int64_t consumerId; int64_t consumerId;
int64_t connId;
SRWLatch lock; SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic> SArray* topics; // SArray<SMqConsumerTopic>
//SHashObj *topicHash; //SHashObj<SMqTopicObj> // SHashObj *topicHash; //SHashObj<SMqTopicObj>
} SMqConsumerObj; } SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
...@@ -602,12 +636,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons ...@@ -602,12 +636,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
typedef struct SMqSubConsumerObj { typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned int64_t consumerUid; // if -1, unassigned
SList *vgId; // SList<int32_t> SList* vgId; // SList<int32_t>
} SMqSubConsumerObj; } SMqSubConsumerObj;
typedef struct SMqSubCGroupObj { typedef struct SMqSubCGroupObj {
char name[TSDB_CONSUMER_GROUP_LEN]; char name[TSDB_CONSUMER_GROUP_LEN];
SList *consumers; // SList<SMqConsumerObj> SList* consumers; // SList<SMqConsumerObj>
} SMqSubCGroupObj; } SMqSubCGroupObj;
typedef struct SMqSubTopicObj { typedef struct SMqSubTopicObj {
...@@ -620,30 +654,30 @@ typedef struct SMqSubTopicObj { ...@@ -620,30 +654,30 @@ typedef struct SMqSubTopicObj {
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
char *sql; char* sql;
char *logicalPlan; char* logicalPlan;
char *physicalPlan; char* physicalPlan;
SList *cgroups; // SList<SMqSubCGroupObj> SList* cgroups; // SList<SMqSubCGroupObj>
} SMqSubTopicObj; } SMqSubTopicObj;
typedef struct SMqConsumerSubObj { typedef struct SMqConsumerSubObj {
int64_t topicUid; int64_t topicUid;
SList *vgIds; // SList<int64_t> SList* vgIds; // SList<int64_t>
} SMqConsumerSubObj; } SMqConsumerSubObj;
typedef struct SMqConsumerHbObj { typedef struct SMqConsumerHbObj {
int64_t consumerId; int64_t consumerId;
SList *consumerSubs; // SList<SMqConsumerSubObj> SList* consumerSubs; // SList<SMqConsumerSubObj>
} SMqConsumerHbObj; } SMqConsumerHbObj;
typedef struct SMqVGroupSubObj { typedef struct SMqVGroupSubObj {
int64_t topicUid; int64_t topicUid;
SList *consumerIds; // SList<int64_t> SList* consumerIds; // SList<int64_t>
} SMqVGroupSubObj; } SMqVGroupSubObj;
typedef struct SMqVGroupHbObj { typedef struct SMqVGroupHbObj {
int64_t vgId; int64_t vgId;
SList *vgSubs; // SList<SMqVGroupSubObj> SList* vgSubs; // SList<SMqVGroupSubObj>
} SMqVGroupHbObj; } SMqVGroupHbObj;
#if 0 #if 0
...@@ -663,11 +697,11 @@ typedef struct SMnodeMsg { ...@@ -663,11 +697,11 @@ typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int32_t acctId; int32_t acctId;
SMnode *pMnode; SMnode* pMnode;
int64_t createdTime; int64_t createdTime;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t contLen; int32_t contLen;
void *pCont; void* pCont;
} SMnodeMsg; } SMnodeMsg;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -98,7 +98,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -98,7 +98,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// build msg // build msg
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = pCEp->vgId, .vgId = pCEp->vgId,
.consumerId = consumerId, .oldConsumerId = -1,
.newConsumerId = consumerId,
}; };
strcpy(req.cgroup, cgroup); strcpy(req.cgroup, cgroup);
strcpy(req.topicName, topic); strcpy(req.topicName, topic);
...@@ -152,6 +153,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -152,6 +153,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
//convert dag to msg //convert dag to msg
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.status = 0;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i); STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
...@@ -171,7 +173,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -171,7 +173,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = { SMqSetCVgReq req = {
.vgId = vgId, .vgId = vgId,
.consumerId = pConsumer->consumerId, .oldConsumerId = -1,
.newConsumerId = pConsumer->consumerId,
}; };
strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.cgroup, pConsumer->cgroup);
strcpy(req.topicName, pTopic->name); strcpy(req.topicName, pTopic->name);
...@@ -451,12 +454,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -451,12 +454,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
if (pTopic == NULL) { if (pTopic == NULL) {
/*terrno = */ mError("topic being subscribed not exist: %s", newTopicName);
continue; continue;
} }
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
if (pSub == NULL) { if (pSub == NULL) {
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
pSub = tNewSubscribeObj(); pSub = tNewSubscribeObj();
if (pSub == NULL) { if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -464,14 +468,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -464,14 +468,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
} }
// set unassigned vg // set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
//TODO: disable alter
} }
taosArrayPush(pSub->availConsumer, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
// TODO: no need
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic); taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
// send setmsg to vnode // send setmsg to vnode
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) { if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
...@@ -479,8 +484,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -479,8 +484,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1; return -1;
} }
} }
taosArrayDestroy(pConsumerTopic->pVgInfo);
free(pConsumerTopic);
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ /*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
mndTransAppendRedolog(pTrans, pRaw); mndTransAppendRedolog(pTrans, pRaw);
...@@ -533,12 +537,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -533,12 +537,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
return -1; return -1;
} }
// TODO: free memory
if (newSub) taosArrayDestroy(newSub); if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "common.h" #include "common.h"
#include "executor.h" #include "executor.h"
#include "vnode.h"
#include "mallocator.h" #include "mallocator.h"
#include "meta.h" #include "meta.h"
#include "os.h" #include "os.h"
...@@ -29,6 +28,7 @@ ...@@ -29,6 +28,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "vnode.h"
#include "wal.h" #include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -149,10 +149,11 @@ typedef struct STqGroup { ...@@ -149,10 +149,11 @@ typedef struct STqGroup {
} STqGroup; } STqGroup;
typedef struct STqTaskItem { typedef struct STqTaskItem {
int8_t status; int8_t status;
int64_t offset; int64_t offset;
void* dst; void* dst;
qTaskInfo_t task; qTaskInfo_t task;
SSubQueryMsg* pQueryMsg;
} STqTaskItem; } STqTaskItem;
// new version // new version
...@@ -164,7 +165,6 @@ typedef struct STqBuffer { ...@@ -164,7 +165,6 @@ typedef struct STqBuffer {
typedef struct STqTopicHandle { typedef struct STqTopicHandle {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_TOPIC_FNAME_LEN];
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
...@@ -177,6 +177,7 @@ typedef struct STqTopicHandle { ...@@ -177,6 +177,7 @@ typedef struct STqTopicHandle {
typedef struct STqConsumerHandle { typedef struct STqConsumerHandle {
int64_t consumerId; int64_t consumerId;
int64_t epoch; int64_t epoch;
char cgroup[TSDB_TOPIC_FNAME_LEN];
SArray* topics; // SArray<STqClientTopic> SArray* topics; // SArray<STqClientTopic>
} STqConsumerHandle; } STqConsumerHandle;
...@@ -318,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset); ...@@ -318,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif #endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,8 +44,10 @@ extern int32_t tqDebugFlag; ...@@ -44,8 +44,10 @@ extern int32_t tqDebugFlag;
// delete persistent storage for meta info // delete persistent storage for meta info
// int tqDropTCGroup(STQ*, const char* topic, int cgId); // int tqDropTCGroup(STQ*, const char* topic, int cgId);
int tqSerializeGroup(const STqGroup*, STqSerializedHead**); //int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup); //const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,7 +68,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl ...@@ -68,7 +68,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
// TODO: error code of buffer pool // TODO: error code of buffer pool
} }
#endif #endif
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, free, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
free(pTq); free(pTq);
#if 0 #if 0
...@@ -478,6 +478,59 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { ...@@ -478,6 +478,59 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
} }
#endif #endif
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
int32_t num = taosArrayGetSize(pConsumer->topics);
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) {
free(*ppHead);
return -1;
}
*ppHead = tmpPtr;
(*ppHead)->ssize = sz;
}
void* ptr = (*ppHead)->content;
*(int64_t*)ptr = pConsumer->consumerId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pConsumer->epoch;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
memcpy(ptr, pConsumer->topics, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
*(int32_t*)ptr = num;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for (int32_t i = 0; i < num; i++) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
memcpy(ptr, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
*(int64_t*)ptr = pTopic->committedOffset;
POINTER_SHIFT(ptr, sizeof(int64_t));
}
return 0;
}
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) {
STqConsumerHandle* pConsumer = *ppConsumer;
const void* ptr = pHead->content;
pConsumer->consumerId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
pConsumer->epoch = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
memcpy(pConsumer->cgroup, ptr, TSDB_TOPIC_FNAME_LEN);
ptr = POINTER_SHIFT(ptr, TSDB_TOPIC_FNAME_LEN);
int32_t sz = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
pConsumer->topics = taosArrayInit(sz, sizeof(STqTopicHandle));
for (int32_t i = 0; i < sz; i++) {
/*STqTopicHandle* topicHandle = */
/*taosArrayPush(pConsumer->topics, );*/
}
return NULL;
}
#if 0
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
// calculate size // calculate size
int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead); int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
...@@ -608,6 +661,7 @@ int tqItemSSize() { ...@@ -608,6 +661,7 @@ int tqItemSSize() {
// mainly for executor // mainly for executor
return 0; return 0;
} }
#endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
SMqConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
...@@ -625,7 +679,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -625,7 +679,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
//TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
continue;
}
if (fetchOffset == -1) {
fetchOffset = pTopic->committedOffset + 1;
}
int8_t pos; int8_t pos;
int8_t skip = 0; int8_t skip = 0;
SWalHead* pHead; SWalHead* pHead;
...@@ -670,6 +731,23 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -670,6 +731,23 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
break; break;
} }
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
SMqTbData tbData = {
.uid = pDataBlock->info.uid,
.numOfCols = pDataBlock->info.numOfCols,
.numOfRows = pDataBlock->info.rows,
};
for (int i = 0; i < pDataBlock->info.numOfCols; i++) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t sz = pColData->info.bytes * pDataBlock->info.rows;
SMqColData colData = {
.bytes = pColData->info.bytes,
.colId = pColData->info.colId,
.type = pColData->info.type,
};
memcpy(colData.data, pColData->pData, colData.bytes * pDataBlock->info.rows);
memcpy(&tbData.colData[i], &colData, sz);
}
/*pDataBlock->info.*/
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
} else { } else {
break; break;
...@@ -692,29 +770,34 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -692,29 +770,34 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
pTopic->buffer.lastOffset = pReq->offset; pTopic->buffer.lastOffset = pReq->offset;
} }
// put output into rsp // put output into rsp
SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 1
};
} }
// launch query
// get result
return 0; return 0;
} }
int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req;
tDecodeSMqSetCVgReq(msg, &req);
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
if (pConsumer == NULL) { if (pConsumer == NULL) {
return -1; return -1;
} }
strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) { if (pTopic == NULL) {
free(pConsumer); free(pConsumer);
return -1; return -1;
} }
strcpy(pTopic->topicName, pReq->topicName); strcpy(pTopic->topicName, req.topicName);
strcpy(pTopic->cgroup, pReq->cgroup); strcpy(pTopic->sql, req.sql);
strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, req.physicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
...@@ -724,9 +807,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { ...@@ -724,9 +807,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle);
} }
// write mq meta taosArrayPush(pConsumer->topics, pTopic);
return 0; return 0;
} }
......
...@@ -25,7 +25,7 @@ int vnodeQueryOpen(SVnode *pVnode) { ...@@ -25,7 +25,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
} }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processing"); vTrace("message in query queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
...@@ -39,7 +39,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -39,7 +39,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("fetch message is processed"); vTrace("message in fetch queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
......
...@@ -112,9 +112,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -112,9 +112,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
SMqSetCVgReq req; if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) {
tDecodeSMqSetCVgReq(ptr, &req);
if (tqProcessSetConnReq(pVnode->pTq, &req) < 0) {
} }
} break; } break;
default: default:
......
...@@ -86,7 +86,7 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp ...@@ -86,7 +86,7 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for use db, code:%x, db:%s", rpcRsp.code, input->db); ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
...@@ -258,7 +258,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport ...@@ -258,7 +258,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgError("error rsp for stablemeta from mnode, code:%x, tbName:%s", rpcRsp.code, tbFullName); ctgError("error rsp for stablemeta from mnode, code:%s, tbName:%s", tstrerror(rpcRsp.code), tbFullName);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
...@@ -320,18 +320,17 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, ...@@ -320,18 +320,17 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, tNameGetTableName(pTableName)); ctgError("error rsp for table meta from vnode, code:%s, tbName:%s", tstrerror(rpcRsp.code), tNameGetTableName(pTableName));
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen); code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) { if (code) {
ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName)); ctgError("Process vnode tablemeta rsp failed, code:%s, tbName:%s", tstrerror(code), tNameGetTableName(pTableName));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName)); ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -22,17 +22,17 @@ extern "C" { ...@@ -22,17 +22,17 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
enum { enum {
QW_PHASE_PRE_QUERY = 1, QW_PHASE_PRE_QUERY = 1,
QW_PHASE_POST_QUERY, QW_PHASE_POST_QUERY,
QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
QW_PHASE_PRE_FETCH, QW_PHASE_PRE_FETCH,
QW_PHASE_POST_FETCH, QW_PHASE_POST_FETCH,
QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
}; };
enum { enum {
...@@ -83,11 +83,9 @@ typedef struct SQWMsg { ...@@ -83,11 +83,9 @@ typedef struct SQWMsg {
} SQWMsg; } SQWMsg;
typedef struct SQWPhaseInput { typedef struct SQWPhaseInput {
int8_t status; int8_t taskStatus;
int8_t taskType; int8_t taskType;
int32_t code; int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWPhaseInput; } SQWPhaseInput;
typedef struct SQWPhaseOutput { typedef struct SQWPhaseOutput {
...@@ -111,6 +109,7 @@ typedef struct SQWTaskCtx { ...@@ -111,6 +109,7 @@ typedef struct SQWTaskCtx {
void *cancelConnection; void *cancelConnection;
bool emptyRes; bool emptyRes;
bool multiExec;
int8_t queryContinue; int8_t queryContinue;
int8_t queryInQueue; int8_t queryInQueue;
int32_t rspCode; int32_t rspCode;
...@@ -133,7 +132,7 @@ typedef struct SQWorkerMgmt { ...@@ -133,7 +132,7 @@ typedef struct SQWorkerMgmt {
int8_t nodeType; int8_t nodeType;
int32_t nodeId; int32_t nodeId;
SRWLatch schLock; SRWLatch schLock;
SRWLatch ctxLock; //SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj; void *nodeObj;
...@@ -144,6 +143,8 @@ typedef struct SQWorkerMgmt { ...@@ -144,6 +143,8 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId #define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS() #define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED) #define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED) #define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
...@@ -151,9 +152,10 @@ typedef struct SQWorkerMgmt { ...@@ -151,9 +152,10 @@ typedef struct SQWorkerMgmt {
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH) #define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
......
...@@ -30,6 +30,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -30,6 +30,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
......
此差异已折叠。
...@@ -50,6 +50,7 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { ...@@ -50,6 +50,7 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -68,6 +69,7 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) { ...@@ -68,6 +69,7 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -98,7 +100,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { ...@@ -98,7 +100,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1, .msgType = TDMT_VND_TASKS_STATUS_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -121,6 +123,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ ...@@ -121,6 +123,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -138,6 +141,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -138,6 +141,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -155,6 +159,7 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) { ...@@ -155,6 +159,7 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -273,7 +278,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -273,7 +278,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -306,15 +311,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -306,15 +311,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWTaskCtx *handles = NULL; SQWTaskCtx *handles = NULL;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen); QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
...@@ -335,14 +336,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ ...@@ -335,14 +336,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
SResReadyReq *msg = pMsg->pCont; SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg"); QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
msg->sId = be64toh(msg->sId); msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId); msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId); msg->taskId = be64toh(msg->taskId);
...@@ -398,6 +398,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -398,6 +398,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
......
...@@ -275,10 +275,13 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { ...@@ -275,10 +275,13 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
} }
int32_t schRecordTaskSucceedNode(SSchTask *pTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); int32_t idx = atomic_load_8(&pTask->candidateIdx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
assert(NULL != addr); if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", idx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pTask->succeedAddr = *addr; pTask->succeedAddr = *addr;
...@@ -578,9 +581,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod ...@@ -578,9 +581,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
tsem_post(&pJob->rspSem); tsem_post(&pJob->rspSem);
} }
SCH_ERR_RET(atomic_load_32(&pJob->errCode)); int32_t code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
assert(0); SCH_JOB_ELOG("job errCode is invalid, errCode:%d", code);
} }
...@@ -721,7 +725,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -721,7 +725,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask)); SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) { if (parentNum == 0) {
...@@ -738,7 +742,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -738,7 +742,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) { } else if (taskDone > pTask->level->taskNum) {
assert(0); SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
} }
if (pTask->level->taskFailed > 0) { if (pTask->level->taskFailed > 0) {
...@@ -871,18 +875,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -871,18 +875,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
} }
atomic_store_ptr(&pJob->res, rsp); atomic_store_ptr(&pJob->res, rsp);
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) { if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
} }
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
SCH_ERR_JRET(schProcessOnDataFetched(pJob)); SCH_ERR_JRET(schProcessOnDataFetched(pJob));
break; break;
} }
case TDMT_VND_DROP_TASK: { case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE // SHOULD NEVER REACH HERE
assert(0); SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break; break;
} }
default: default:
...@@ -1030,6 +1037,8 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -1030,6 +1037,8 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code); qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1289,6 +1298,8 @@ void schDropJobAllTasks(SSchJob *pJob) { ...@@ -1289,6 +1298,8 @@ void schDropJobAllTasks(SSchJob *pJob) {
} }
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
qDebug("QID:%"PRIx64" job started", pDag->queryId);
if (nodeList && taosArrayGetSize(nodeList) <= 0) { if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
} }
...@@ -1356,7 +1367,7 @@ _return: ...@@ -1356,7 +1367,7 @@ _return:
*(SSchJob **)job = NULL; *(SSchJob **)job = NULL;
scheduleFreeJob(pJob); schedulerFreeJob(pJob);
SCH_RET(code); SCH_RET(code);
} }
...@@ -1401,7 +1412,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -1401,7 +1412,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -1418,7 +1429,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru ...@@ -1418,7 +1429,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { int32_t schedulerAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -1551,7 +1562,7 @@ _return: ...@@ -1551,7 +1562,7 @@ _return:
} }
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) { if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -1616,11 +1627,12 @@ _return: ...@@ -1616,11 +1627,12 @@ _return:
} }
*pData = rsp; *pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
} }
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, code:%s", tstrerror(code)); SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
atomic_sub_fetch_32(&pJob->ref, 1); atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code); SCH_RET(code);
...@@ -1638,7 +1650,7 @@ int32_t scheduleCancelJob(void *job) { ...@@ -1638,7 +1650,7 @@ int32_t scheduleCancelJob(void *job) {
SCH_RET(code); SCH_RET(code);
} }
void scheduleFreeJob(void *job) { void schedulerFreeJob(void *job) {
if (NULL == job) { if (NULL == job) {
return; return;
} }
...@@ -1667,7 +1679,8 @@ void scheduleFreeJob(void *job) { ...@@ -1667,7 +1679,8 @@ void scheduleFreeJob(void *job) {
usleep(1); usleep(1);
} else { } else {
assert(0); SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
break;
} }
} }
......
...@@ -334,7 +334,7 @@ void schtFreeQueryJob(int32_t freeThread) { ...@@ -334,7 +334,7 @@ void schtFreeQueryJob(int32_t freeThread) {
SSchJob *job = atomic_load_ptr(&pQueryJob); SSchJob *job = atomic_load_ptr(&pQueryJob);
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
scheduleFreeJob(job); schedulerFreeJob(job);
if (freeThread) { if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) { if (++freeNum % schtTestPrintNum == 0) {
printf("FreeNum:%d\n", freeNum); printf("FreeNum:%d\n", freeNum);
...@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) { ...@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job);
assert(code == 0); assert(code == 0);
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
...@@ -472,7 +472,7 @@ void* schtRunJobThread(void *aa) { ...@@ -472,7 +472,7 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1); atomic_store_32(&schtStartFetch, 1);
void *data = NULL; void *data = NULL;
code = scheduleFetchRows(pQueryJob, &data); code = schedulerFetchRows(pQueryJob, &data);
assert(code == 0 || code); assert(code == 0 || code);
if (0 == code) { if (0 == code) {
...@@ -482,7 +482,7 @@ void* schtRunJobThread(void *aa) { ...@@ -482,7 +482,7 @@ void* schtRunJobThread(void *aa) {
} }
data = NULL; data = NULL;
code = scheduleFetchRows(pQueryJob, &data); code = schedulerFetchRows(pQueryJob, &data);
assert(code == 0 || code); assert(code == 0 || code);
schtFreeQueryJob(0); schtFreeQueryJob(0);
...@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) { ...@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) {
schtSetExecNode(); schtSetExecNode();
schtSetAsyncSendMsgToServer(); schtSetAsyncSendMsgToServer();
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob; SSchJob *job = (SSchJob *)pJob;
...@@ -594,7 +594,7 @@ TEST(queryTest, normalCase) { ...@@ -594,7 +594,7 @@ TEST(queryTest, normalCase) {
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job); pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
void *data = NULL; void *data = NULL;
code = scheduleFetchRows(job, &data); code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
...@@ -603,11 +603,11 @@ TEST(queryTest, normalCase) { ...@@ -603,11 +603,11 @@ TEST(queryTest, normalCase) {
tfree(data); tfree(data);
data = NULL; data = NULL;
code = scheduleFetchRows(job, &data); code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_TRUE(data); ASSERT_TRUE(data);
scheduleFreeJob(pJob); schedulerFreeJob(pJob);
schtFreeQueryDag(&dag); schtFreeQueryDag(&dag);
...@@ -649,11 +649,11 @@ TEST(insertTest, normalCase) { ...@@ -649,11 +649,11 @@ TEST(insertTest, normalCase) {
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
SQueryResult res = {0}; SQueryResult res = {0};
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res); code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20); ASSERT_EQ(res.numOfRows, 20);
scheduleFreeJob(pInsertJob); schedulerFreeJob(pInsertJob);
schedulerDestroy(); schedulerDestroy();
} }
......
...@@ -353,7 +353,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") ...@@ -353,7 +353,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST, "Task context not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册