提交 15ed48c3 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

...@@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
while (running) { while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
......
...@@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); ...@@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0 DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0 #if 0
......
...@@ -49,6 +49,11 @@ enum { ...@@ -49,6 +49,11 @@ enum {
TMQ_CONF__RESET_OFFSET__NONE = -3, TMQ_CONF__RESET_OFFSET__NONE = -3,
}; };
enum {
TMQ_MSG_TYPE__POLL_RSP = 0,
TMQ_MSG_TYPE__EP_RSP,
};
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray* pGroupList; SArray* pGroupList;
......
...@@ -1382,8 +1382,6 @@ typedef struct SMqCMGetSubEpReq { ...@@ -1382,8 +1382,6 @@ typedef struct SMqCMGetSubEpReq {
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCMGetSubEpReq; } SMqCMGetSubEpReq;
#pragma pack(pop)
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->contLen); tlen += taosEncodeFixedI32(buf, pMsg->contLen);
...@@ -1853,6 +1851,12 @@ typedef struct { ...@@ -1853,6 +1851,12 @@ typedef struct {
SMqTbData* tbData; SMqTbData* tbData;
} SMqTopicData; } SMqTopicData;
typedef struct {
int8_t mqMsgType;
int32_t code;
int32_t epoch;
} SMqRspHead;
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
SSchemaWrapper* schemas; SSchemaWrapper* schemas;
...@@ -1869,6 +1873,7 @@ typedef struct { ...@@ -1869,6 +1873,7 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t currentOffset; int64_t currentOffset;
...@@ -1888,11 +1893,19 @@ typedef struct { ...@@ -1888,11 +1893,19 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp; } SMqCMGetSubEpRsp;
struct tmq_message_t {
SMqRspHead head;
union {
SMqConsumeRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp;
};
void* extra;
};
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
...@@ -1945,7 +1958,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE ...@@ -1945,7 +1958,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId); tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI32(buf, pRsp->epoch);
tlen += taosEncodeString(buf, pRsp->cgroup); tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
...@@ -1958,7 +1970,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu ...@@ -1958,7 +1970,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI32(buf, &pRsp->epoch);
buf = taosDecodeStringTo(buf, pRsp->cgroup); buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
...@@ -1974,6 +1985,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -1974,6 +1985,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
return buf; return buf;
} }
#pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -13,19 +13,17 @@ ...@@ -13,19 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "clientInt.h"
#include "trpc.h"
#include "catalog.h" #include "catalog.h"
#include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "trpc.h"
static SClientHbMgr clientHbMgr = {0}; static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread(); static int32_t hbCreateThread();
static void hbStopThread(); static void hbStopThread();
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
return 0;
}
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0; int32_t code = 0;
...@@ -39,8 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -39,8 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else { } else {
...@@ -60,8 +58,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -60,8 +58,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
taosHashCleanup(vgInfo.vgHash); taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo);
} }
...@@ -106,8 +104,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -106,8 +104,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) { if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -116,7 +114,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs ...@@ -116,7 +114,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
tscDebug("hb got %d rsp kv", kvNum); tscDebug("hb got %d rsp kv", kvNum);
for (int32_t i = 0; i < kvNum; ++i) { for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i); SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) { switch (kv->key) {
...@@ -126,30 +124,30 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs ...@@ -126,30 +124,30 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
break; break;
} }
int64_t *clusterId = (int64_t *)info->param; int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog); int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
break; break;
} }
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break; break;
} }
case HEARTBEAT_KEY_STBINFO:{ case HEARTBEAT_KEY_STBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) { if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
break; break;
} }
int64_t *clusterId = (int64_t *)info->param; int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog); int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
break; break;
} }
...@@ -165,22 +163,22 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs ...@@ -165,22 +163,22 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0; static int32_t emptyRspNum = 0;
if (code != 0) { if (code != 0) {
tfree(param); tfree(param);
return -1; return -1;
} }
char *key = (char *)param; char *key = (char *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) { if (pInst == NULL || NULL == *pInst) {
tscError("cluster not exist, key:%s", key); tscError("cluster not exist, key:%s", key);
tfree(param); tfree(param);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
...@@ -189,13 +187,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code ...@@ -189,13 +187,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param); tfree(param);
if (rspNum) { if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else { } else {
atomic_add_fetch_32(&emptyRspNum, 1); atomic_add_fetch_32(&emptyRspNum, 1);
} }
for (int32_t i = 0; i < rspNum; ++i) { for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp);
if (code) { if (code) {
break; break;
...@@ -203,14 +202,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code ...@@ -203,14 +202,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
} }
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return code; return code;
} }
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0; uint32_t dbNum = 0;
int32_t code = 0; int32_t code = 0;
code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -238,8 +237,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl ...@@ -238,8 +237,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL; SSTableMetaVersion *stbs = NULL;
uint32_t stbNum = 0; uint32_t stbNum = 0;
int32_t code = 0; int32_t code = 0;
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -254,7 +253,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -254,7 +253,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
SSTableMetaVersion *stb = &stbs[i]; SSTableMetaVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid); stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion); stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion); stb->tversion = htons(stb->tversion);
} }
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
...@@ -266,17 +265,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -266,17 +265,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { int64_t *clusterId = (int64_t *)param;
int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog); int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
return code; return code;
} }
code = hbGetExpiredDBInfo(connKey, pCatalog, req); code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
...@@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req ...@@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
return code; return code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {}
}
void hbMgrInitMqHbHandle() { void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
...@@ -312,10 +307,8 @@ void hbFreeReq(void *req) { ...@@ -312,10 +307,8 @@ void hbFreeReq(void *req) {
tFreeReqKvHash(pReq->info); tFreeReqKvHash(pReq->info);
} }
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq));
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) { if (pBatchReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -324,11 +317,11 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -324,11 +317,11 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
int32_t code = 0; int32_t code = 0;
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SClientHbReq* pOneReq = pIter; SClientHbReq *pOneReq = pIter;
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) { if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq);
if (code) { if (code) {
...@@ -350,11 +343,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -350,11 +343,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq; return pBatchReq;
} }
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SClientHbReq* pOneReq = pIter; SClientHbReq *pOneReq = pIter;
tFreeReqKvHash(pOneReq->info); tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info); taosHashClear(pOneReq->info);
...@@ -363,31 +355,29 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { ...@@ -363,31 +355,29 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
} }
} }
static void *hbThreadFunc(void *param) {
static void* hbThreadFunc(void* param) {
setThreadName("hb"); setThreadName("hb");
while (1) { while (1) {
int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2);
if(1 == threadStop) { if (1 == threadStop) {
break; break;
} }
pthread_mutex_lock(&clientHbMgr.lock); pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for(int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) { if (connCnt == 0) {
continue; continue;
} }
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
if (pReq == NULL) { if (pReq == NULL) {
continue; continue;
} }
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
void *buf = malloc(tlen); void *buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -395,7 +385,7 @@ static void* hbThreadFunc(void* param) { ...@@ -395,7 +385,7 @@ static void* hbThreadFunc(void* param) {
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
break; break;
} }
tSerializeSClientHbBatchReq(buf, tlen, pReq); tSerializeSClientHbBatchReq(buf, tlen, pReq);
SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo));
...@@ -415,17 +405,17 @@ static void* hbThreadFunc(void* param) { ...@@ -415,17 +405,17 @@ static void* hbThreadFunc(void* param) {
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0; int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq, false);
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
} }
pthread_mutex_unlock(&clientHbMgr.lock); pthread_mutex_unlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL); taosMsleep(HEARTBEAT_INTERVAL);
} }
return NULL; return NULL;
...@@ -449,17 +439,18 @@ static void hbStopThread() { ...@@ -449,17 +439,18 @@ static void hbStopThread() {
tscDebug("hb thread already stopped"); tscDebug("hb thread already stopped");
return; return;
} }
while (2 != atomic_load_8(&clientHbMgr.threadStop)) { while (2 != atomic_load_8(&clientHbMgr.threadStop)) {
usleep(10); usleep(10);
} }
tscDebug("hb thread stopped"); tscDebug("hb thread stopped");
} }
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
/*return NULL;*/
hbMgrInit(); hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -495,7 +486,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { ...@@ -495,7 +486,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
pthread_mutex_lock(&clientHbMgr.lock); pthread_mutex_lock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
pthread_mutex_unlock(&clientHbMgr.lock); pthread_mutex_unlock(&clientHbMgr.lock);
return pAppHbMgr; return pAppHbMgr;
} }
...@@ -504,7 +495,7 @@ void appHbMgrCleanup(void) { ...@@ -504,7 +495,7 @@ void appHbMgrCleanup(void) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
taosHashCleanup(pTarget->activeInfo); taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL; pTarget->activeInfo = NULL;
taosHashCleanup(pTarget->connInfo); taosHashCleanup(pTarget->connInfo);
...@@ -515,11 +506,12 @@ void appHbMgrCleanup(void) { ...@@ -515,11 +506,12 @@ void appHbMgrCleanup(void) {
} }
int hbMgrInit() { int hbMgrInit() {
/*return 0;*/
// init once // init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0; if (old == 1) return 0;
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
pthread_mutex_init(&clientHbMgr.lock, NULL); pthread_mutex_init(&clientHbMgr.lock, NULL);
// init handle funcs // init handle funcs
...@@ -534,34 +526,34 @@ int hbMgrInit() { ...@@ -534,34 +526,34 @@ int hbMgrInit() {
void hbMgrCleanUp() { void hbMgrCleanUp() {
return; return;
hbStopThread(); hbStopThread();
// destroy all appHbMgr // destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return; if (old == 0) return;
pthread_mutex_lock(&clientHbMgr.lock); pthread_mutex_lock(&clientHbMgr.lock);
appHbMgrCleanup(); appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs); taosArrayDestroy(clientHbMgr.appHbMgrs);
pthread_mutex_unlock(&clientHbMgr.lock); pthread_mutex_unlock(&clientHbMgr.lock);
clientHbMgr.appHbMgrs = NULL; clientHbMgr.appHbMgrs = NULL;
} }
int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
// init hash in activeinfo // init hash in activeinfo
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) { if (data != NULL) {
return 0; return 0;
} }
SClientHbReq hbReq; SClientHbReq hbReq;
hbReq.connKey = connKey; hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash // init hash
if (info != NULL) { if (info != NULL) {
SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
info->req = pReq; info->req = pReq;
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
} }
...@@ -570,9 +562,10 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -570,9 +562,10 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
return 0; return 0;
} }
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
/*return 0;*/
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0}; SHbConnInfo info = {0};
switch (hbType) { switch (hbType) {
case HEARTBEAT_TYPE_QUERY: { case HEARTBEAT_TYPE_QUERY: {
...@@ -588,11 +581,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 ...@@ -588,11 +581,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3
default: default:
break; break;
} }
return hbRegisterConnImpl(pAppHbMgr, connKey, &info); return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
} }
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
/*return;*/
int32_t code = 0; int32_t code = 0;
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
...@@ -602,9 +596,11 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { ...@@ -602,9 +596,11 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
} }
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
int32_t valueLen) {
return 0;
// find req by connection id // find req by connection id
SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL); ASSERT(pReq != NULL);
taosHashPut(pReq->info, key, keyLen, value, valueLen); taosHashPut(pReq->info, key, keyLen, value, valueLen);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tqueue.h"
#include "tref.h" #include "tref.h"
struct tmq_list_t { struct tmq_list_t {
...@@ -58,22 +59,28 @@ struct tmq_t { ...@@ -58,22 +59,28 @@ struct tmq_t {
char groupId[256]; char groupId[256];
char clientId[256]; char clientId[256];
int8_t autoCommit; int8_t autoCommit;
SRWLatch lock;
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int64_t status; int64_t status;
tsem_t rspSem;
STscObj* pTscObj; STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
int32_t nextTopicIdx; int32_t nextTopicIdx;
SArray* clientTopics; // SArray<SMqClientTopic> SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
// stat // stat
int64_t pollCnt; int64_t pollCnt;
}; };
struct tmq_message_t { enum {
SMqConsumeRsp rsp; TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
};
enum {
TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY,
}; };
typedef struct { typedef struct {
...@@ -83,6 +90,7 @@ typedef struct { ...@@ -83,6 +90,7 @@ typedef struct {
int64_t currentOffset; int64_t currentOffset;
// connection info // connection info
int32_t vgId; int32_t vgId;
int32_t vgStatus;
SEpSet epSet; SEpSet epSet;
} SMqClientVg; } SMqClientVg;
...@@ -104,15 +112,16 @@ typedef struct { ...@@ -104,15 +112,16 @@ typedef struct {
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int32_t wait; int32_t sync;
tsem_t rspSem;
} SMqAskEpCbParam; } SMqAskEpCbParam;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
SMqClientVg* pVg; SMqClientVg* pVg;
tmq_message_t** retMsg; int32_t epoch;
tsem_t rspSem; tsem_t rspSem;
} SMqConsumeCbParam; } SMqPollCbParam;
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
...@@ -125,7 +134,7 @@ typedef struct { ...@@ -125,7 +134,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false; conf->auto_commit = false;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf; return conf;
} }
...@@ -209,6 +218,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -209,6 +218,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if (*topics == NULL) {
*topics = tmq_list_new();
}
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
tmq_list_append(*topics, strdup(topic->topicName));
}
return TMQ_RESP_ERR__SUCCESS;
}
tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
tmq_list_t* lst = tmq_list_new();
return tmq_subscribe(tmq, lst);
}
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1); tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
if (pTmq == NULL) { if (pTmq == NULL) {
...@@ -218,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -218,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->status = 0; pTmq->status = 0;
pTmq->pollCnt = 0; pTmq->pollCnt = 0;
pTmq->epoch = 0; pTmq->epoch = 0;
taosInitRWLatch(&pTmq->lock);
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
...@@ -226,9 +250,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -226,9 +250,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
return pTmq; return pTmq;
} }
...@@ -290,7 +316,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -290,7 +316,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
pParam->tmq = tmq; pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0); tsem_init(&pParam->rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = pParam; sendInfo->param = pParam;
...@@ -365,10 +395,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -365,10 +395,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tscError("failed to malloc request"); tscError("failed to malloc request");
} }
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS,
.tmq = tmq,
};
tsem_init(&param.rspSem, 0, 0); tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param; sendInfo->param = &param;
...@@ -391,36 +428,6 @@ _return: ...@@ -391,36 +428,6 @@ _return:
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
tmq_t* pTmq = (void*)param;
SArray* pArray = taosArrayInit(0, sizeof(SKv));
if (pArray == NULL) {
return NULL;
}
SKv kv = {0};
kv.key = HEARTBEAT_KEY_MQ_TMP;
SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (pMqHb == NULL) {
return pArray;
}
pMqHb->consumerId = connKey.connId;
SArray* clientTopics = pTmq->clientTopics;
int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
/*if (pCTopic->vgId == -1) {*/
/*pMqHb->status = 1;*/
/*break;*/
/*}*/
}
kv.value = pMqHb;
kv.valueLen = sizeof(SMqHbMsg);
taosArrayPush(pArray, &kv);
return pArray;
}
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -578,7 +585,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) { ...@@ -578,7 +585,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static bool noPrintSchema; static bool noPrintSchema;
char pBuf[128]; char pBuf[128];
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
int32_t colNum = pRsp->schemas->nCols; int32_t colNum = pRsp->schemas->nCols;
if (!noPrintSchema) { if (!noPrintSchema) {
printf("|"); printf("|");
...@@ -618,94 +625,125 @@ void tmqShowMsg(tmq_message_t* tmq_message) { ...@@ -618,94 +625,125 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
} }
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; printf("recv poll\n");
SMqClientVg* pVg = pParam->pVg; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
printf("msg discard\n"); printf("msg discard\n");
tsem_post(&pParam->rspSem); if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
return 0; return 0;
} }
SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
return 0;
}
if (msgEpoch != tmqEpoch) {
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
}
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) { if (pRsp == NULL) {
tsem_post(&pParam->rspSem); printf("fail\n");
return -1; return -1;
} }
tDecodeSMqConsumeRsp(pMsg->pData, pRsp); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->numOfTopics == 0) { if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/ printf("no data\n");
free(pRsp); if (pParam->epoch == tmq->epoch) {
tsem_post(&pParam->rspSem); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosFreeQitem(pRsp);
return 0; return 0;
} }
*pParam->retMsg = (tmq_message_t*)pRsp; pRsp->extra = pParam->pVg;
pVg->currentOffset = pRsp->rspOffset; taosWriteQitem(tmq->mqueue, pRsp);
printf("poll in queue\n");
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
/*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/ /*printf("-----msg begin----\n");*/
tsem_post(&pParam->rspSem);
/*printf("\n-----msg end------\n");*/ /*printf("\n-----msg end------\n");*/
return 0; return 0;
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics);
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = pVgEp->offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
}
atomic_store_32(&tmq->epoch, epoch);
return set;
}
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
if (pParam->wait) { if (pParam->sync) {
tsem_post(&tmq->rspSem); tsem_post(&pParam->rspSem);
} }
return 0; return 0;
} }
tscDebug("tmq ask ep cb called"); tscDebug("tmq ask ep cb called");
bool set = false; if (pParam->sync) {
SMqCMGetSubEpRsp rsp; SMqRspHead* head = pMsg->pData;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); SMqCMGetSubEpRsp rsp;
int32_t sz = taosArrayGetSize(rsp.topics); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
// TODO: lock /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ int32_t epoch = atomic_load_32(&tmq->epoch);
if (rsp.epoch != tmq->epoch) { if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) {
// TODO atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
// clang-format off
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = pVgEp->offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
// clang-format on
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
} }
tmq->epoch = rsp.epoch; tsem_post(&pParam->rspSem);
} tDeleteSMqCMGetSubEpRsp(&rsp);
if (set) { } else {
atomic_store_64(&tmq->status, 1); tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
} if (pRsp == NULL) {
// unlock terrno = TSDB_CODE_OUT_OF_MEMORY;
/*tsem_post(&tmq->rspSem);*/ return -1;
if (pParam->wait) { }
tsem_post(&tmq->rspSem); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp);
taosWriteQitem(tmq->mqueue, pRsp);
} }
tDeleteSMqCMGetSubEpRsp(&rsp);
return 0; return 0;
} }
int32_t tmqAskEp(tmq_t* tmq, bool wait) { int32_t tmqAskEp(tmq_t* tmq, bool sync) {
printf("ask ep sync %d\n", sync);
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen); SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
...@@ -722,7 +760,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { ...@@ -722,7 +760,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
goto END; goto END;
} }
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam));
if (pParam == NULL) { if (pParam == NULL) {
...@@ -730,7 +772,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { ...@@ -730,7 +772,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
goto END; goto END;
} }
pParam->tmq = tmq; pParam->tmq = tmq;
pParam->wait = wait; pParam->sync = sync;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
...@@ -743,7 +786,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { ...@@ -743,7 +786,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) {
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
END: END:
if (wait) tsem_wait(&tmq->rspSem); if (sync) tsem_wait(&pParam->rspSem);
return 0; return 0;
} }
...@@ -791,6 +834,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie ...@@ -791,6 +834,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
pReq->blockingTime = blocking_time; pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId; pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset; pReq->currentOffset = reqOffset;
pReq->head.vgId = htonl(pVg->vgId); pReq->head.vgId = htonl(pVg->vgId);
...@@ -798,11 +842,158 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie ...@@ -798,11 +842,158 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie
return pReq; return pReq;
} }
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
printf("call poll\n");
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
continue;
}
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return -1;
}
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return -1;
}
param->tmq = tmq;
param->pVg = pVg;
param->epoch = tmq->epoch;
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = param;
sendInfo->fp = tmqPollCb;
int64_t transporterId = 0;
printf("send poll\n");
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
}
}
return 0;
}
// return
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) {
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp);
tmqClearUnhandleMsg(tmq);
*pReset = true;
} else {
*pReset = false;
}
} else {
return -1;
}
return 0;
}
tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
while (1) {
tmq_message_t* rspMsg = NULL;
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
break;
}
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);
if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) {
printf("epoch match\n");
SMqClientVg* pVg = rspMsg->extra;
pVg->currentOffset = rspMsg->consumeRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg;
} else {
printf("epoch mismatch\n");
taosFreeQitem(rspMsg);
}
} else {
printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);
bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset);
taosFreeQitem(rspMsg);
if (pollIfReset && reset) {
printf("reset and repoll\n");
tmqPollImpl(tmq, blockingTime);
}
}
}
return NULL;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL; tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status); int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == 0); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
}
tmqHandleAllRsp(tmq, blocking_time, false);
tmqPollImpl(tmq, blocking_time);
while (1) {
/*printf("cycle\n");*/
taosReadAllQitems(tmq->mqueue, tmq->qall);
rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
if (rspMsg) {
return rspMsg;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
printf("normal exit\n");
return NULL;
}
}
}
}
#if 0
if (blocking_time <= 0) blocking_time = 1; if (blocking_time <= 0) blocking_time = 1;
if (blocking_time > 1000) blocking_time = 1000; if (blocking_time > 1000) blocking_time = 1000;
...@@ -834,7 +1025,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -834,7 +1025,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
return NULL; return NULL;
} }
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) { if (param == NULL) {
ASSERT(false); ASSERT(false);
usleep(blocking_time * 1000); usleep(blocking_time * 1000);
...@@ -846,7 +1037,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -846,7 +1037,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tsem_init(&param->rspSem, 0, 0); tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
...@@ -886,6 +1081,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -886,6 +1081,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/ /*return pRequest;*/
} }
#endif
#if 0 #if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
...@@ -929,9 +1125,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -929,9 +1125,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; SMqConsumeRsp* pRsp = &tmq_message->consumeRsp;
tDeleteSMqConsumeRsp(pRsp); tDeleteSMqConsumeRsp(pRsp);
free(tmq_message); taosFreeQitem(tmq_message);
} }
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
......
...@@ -271,7 +271,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { ...@@ -271,7 +271,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddTimezone(pCfg, "timezone", tsTimezone) != 0) return -1; if (cfgAddTimezone(pCfg, "timezone", tsTimezone) != 0) return -1;
if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1; if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1;
if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1; if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1;
if (cfgAddBool(pCfg, "enableCoreFile", 0, 1) != 0) return -1; if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "pageSize(KB)", tsPageSize, 0, INT64_MAX, 1) != 0) return -1; if (cfgAddInt32(pCfg, "pageSize(KB)", tsPageSize, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;
......
...@@ -270,7 +270,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR ...@@ -270,7 +270,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
int32_t rspNum = 0; int32_t rspNum = 0;
if (tDecodeI32(&decoder, &rspNum) < 0) return -1; if (tDecodeI32(&decoder, &rspNum) < 0) return -1;
if (pBatchRsp->rsps == NULL) { if (pBatchRsp->rsps == NULL) {
pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbReq)); pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp));
} }
for (int32_t i = 0; i < rspNum; i++) { for (int32_t i = 0; i < rspNum; i++) {
SClientHbRsp rsp = {0}; SClientHbRsp rsp = {0};
...@@ -1529,7 +1529,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { ...@@ -1529,7 +1529,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if (pRsp->vgNum <= 0) { if (pRsp->vgNum <= 0) {
return 0; return 0;
} }
pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo)); pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo));
if (pRsp->pVgroupInfos == NULL) { if (pRsp->pVgroupInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
......
...@@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy(rsp.cgroup, pReq->cgroup); strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId; rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch; if (epoch != pConsumer->epoch) {
if (epoch != rsp.epoch) { mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
int32_t sz = taosArrayGetSize(pTopics); int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
...@@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
} }
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
void *abuf = buf; ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = pConsumer->epoch;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
......
...@@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if (pRaw == NULL) goto TOPIC_ENCODE_OVER; if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
...@@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
int32_t len; int32_t len;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
...@@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { ...@@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0}; char db[TSDB_TOPIC_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db); tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
...@@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq ...@@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
pDrop->head.contLen = htonl(contLen); pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId); pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN); memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN);
pDrop->tuid = htobe64(pTopic->uid); pDrop->tuid = htobe64(pTopic->uid);
return pDrop; return pDrop;
...@@ -343,6 +343,7 @@ CREATE_TOPIC_OVER: ...@@ -343,6 +343,7 @@ CREATE_TOPIC_OVER:
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
// TODO: cannot drop when subscribed
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
...@@ -408,75 +409,34 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { ...@@ -408,75 +409,34 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
return 0; return 0;
} }
static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
STableInfoReq infoReq = {0};
if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
mDebug("topic:%s, start to retrieve meta", infoReq.tbName);
#if 0 #if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1; return -1;
} }
STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname); int32_t numOfTopics = 0;
if (pTopic == NULL) { void *pIter = NULL;
mndReleaseDb(pMnode, pDb); while (1) {
terrno = TSDB_CODE_MND_INVALID_TOPIC; SMqTopicObj *pTopic = NULL;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
return -1; if (pIter == NULL) break;
}
taosRLockLatch(&pTopic->lock); if (pTopic->dbUid == pDb->uid) {
int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; numOfTopics++;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); }
STableMetaRsp *pMeta = rpcMallocCont(contLen); sdbRelease(pSdb, pTopic);
if (pMeta == NULL) {
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
} }
memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN); *pNumOfTopics = numOfTopics;
pMeta->numOfTags = htonl(pTopic->numOfTags);
pMeta->numOfColumns = htonl(pTopic->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pTopic->version);
pMeta->tuid = htonl(pTopic->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchemas[i];
SSchema *pSrcSchema = &pTopic->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
pReq->pCont = pMeta;
pReq->contLen = contLen;
mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif
return 0; return 0;
} }
#endif
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -571,7 +531,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in ...@@ -571,7 +531,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
if (pTopic->dbUid != pDb->uid) { if (pTopic->dbUid != pDb->uid) {
if (strncmp(pTopic->name, prefix, prefixLen) != 0) { if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
} }
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
...@@ -580,8 +540,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in ...@@ -580,8 +540,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
cols = 0; cols = 0;
char topicName[TSDB_TABLE_NAME_LEN] = {0}; char topicName[TSDB_TOPIC_NAME_LEN] = {0};
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN); tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, topicName); STR_TO_VARSTR(pWrite, topicName);
cols++; cols++;
......
...@@ -51,7 +51,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl ...@@ -51,7 +51,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
......
...@@ -79,19 +79,19 @@ extern int32_t tqDebugFlag; ...@@ -79,19 +79,19 @@ extern int32_t tqDebugFlag;
// 4096 - 4080 // 4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16 #define TQ_IDX_PAGE_HEAD_SIZE 16
#define TQ_ACTION_CONST 0 #define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2 #define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3 #define TQ_ACTION_INTXN 3
#define TQ_SVER 0 #define TQ_SVER 0
// TODO: inplace mode is not implemented // TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0 #define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1 #define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0 #define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2 #define TQ_DUP_INTXN_REJECT 2
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
...@@ -160,7 +160,7 @@ struct STQ { ...@@ -160,7 +160,7 @@ struct STQ {
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
SWal* pWal; SWal* pWal;
SMeta* pMeta; SMeta* pVnodeMeta;
}; };
typedef struct { typedef struct {
...@@ -190,9 +190,6 @@ typedef struct { ...@@ -190,9 +190,6 @@ typedef struct {
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
char* qmsg; char* qmsg;
int64_t persistedOffset;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer; STqBuffer buffer;
SWalReadHandle* pReadhandle; SWalReadHandle* pReadhandle;
} STqTopic; } STqTopic;
...@@ -201,7 +198,7 @@ typedef struct { ...@@ -201,7 +198,7 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
int64_t epoch; int64_t epoch;
char cgroup[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_TOPIC_FNAME_LEN];
SArray* topics; // SArray<STqTopicHandle> SArray* topics; // SArray<STqTopic>
} STqConsumer; } STqConsumer;
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
......
...@@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl ...@@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pMeta = pMeta; pTq->pVnodeMeta = pMeta;
#if 0 #if 0
pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
...@@ -71,9 +71,11 @@ void tqClose(STQ* pTq) { ...@@ -71,9 +71,11 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
int tqPushMsg(STQ* pTq, void* p, int64_t version) { int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
// add reference // TODO: add reference
// judge and launch new query // if handle waiting, launch query and response to consumer
//
// if no waiting handle, return
return 0; return 0;
} }
...@@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) ...@@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic)
/*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/ /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/ /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen += taosEncodeString(buf, pTopic->qmsg); tlen += taosEncodeString(buf, pTopic->qmsg);
tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset); /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
tlen += taosEncodeFixedI64(buf, pTopic->committedOffset); /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/
tlen += taosEncodeFixedI64(buf, pTopic->currentOffset); /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/
return tlen; return tlen;
} }
...@@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi ...@@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi
/*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/ /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/ /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf = taosDecodeString(buf, &pTopic->qmsg); buf = taosDecodeString(buf, &pTopic->qmsg);
buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset); /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
buf = taosDecodeFixedI64(buf, &pTopic->committedOffset); /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/
buf = taosDecodeFixedI64(buf, &pTopic->currentOffset); /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/
return buf; return buf;
} }
...@@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
} }
for (int j = 0; j < TQ_BUFFER_SIZE; j++) { for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[j].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
} }
...@@ -218,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -218,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset = pReq->currentOffset + 1; fetchOffset = pReq->currentOffset + 1;
} }
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 0,
.pBlockData = NULL,
};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
...@@ -243,7 +249,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -243,7 +249,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// rsponse to user // response to user
break; break;
} }
pHead = pTopic->pReadhandle->pHead; pHead = pTopic->pReadhandle->pHead;
...@@ -268,19 +274,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -268,19 +274,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
rsp.numOfTopics = 1; rsp.numOfTopics = 1;
rsp.pBlockData = pRes; rsp.pBlockData = pRes;
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
return -1; return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = buf; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqConsumeRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf; pMsg->pCont = buf;
...@@ -295,14 +302,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -295,14 +302,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
return -1; return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
void* abuf = buf; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqConsumeRsp(&abuf, &rsp);
rsp.pBlockData = NULL; rsp.pBlockData = NULL;
pMsg->pCont = buf; pMsg->pCont = buf;
...@@ -312,158 +321,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -312,158 +321,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
#if 0
int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset;
int64_t blockingTime = pReq->blockingTime;
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) {
STqTopic* pTopic = taosArrayGet(pConsumer->topics, i);
// TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
ASSERT(false);
continue;
}
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset - 1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
if (fetchOffset <= pTopic->committedOffset) {
fetchOffset = pTopic->committedOffset + 1;
}
/*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/
int8_t pos;
int8_t skip = 0;
SWalHead* pHead;
while (1) {
pos = fetchOffset % TQ_BUFFER_SIZE;
skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1);
if (skip == 1) {
// do nothing
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
}
rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
}
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
fetchOffset++;
}
if (skip == 1) continue;
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont);
// SArray<SSDataBlock>
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
break;
}
if (pDataBlock != NULL) {
taosArrayPush(pRes, pDataBlock);
} else {
break;
}
}
// TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
fetchOffset++;
continue;
} else {
rsp.numOfTopics++;
}
rsp.pBlockData = pRes;
#if 0
pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset;
}
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset;
}
#endif
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL;
}
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0}; SMqMVRebReq req = {0};
tDecodeSMqMVRebReq(msg, &req); tDecodeSMqMVRebReq(msg, &req);
...@@ -505,8 +362,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -505,8 +362,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->logicalPlan = req.logicalPlan; pTopic->logicalPlan = req.logicalPlan;
pTopic->physicalPlan = req.physicalPlan; pTopic->physicalPlan = req.physicalPlan;
pTopic->qmsg = req.qmsg; pTopic->qmsg = req.qmsg;
pTopic->committedOffset = -1; /*pTopic->committedOffset = -1;*/
pTopic->currentOffset = -1; /*pTopic->currentOffset = -1;*/
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
...@@ -516,8 +373,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -516,8 +373,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
} }
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta};
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
} }
......
...@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// todo: change the interface here // todo: change the interface here
int64_t ver; int64_t ver;
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) {
// TODO: handle error // TODO: handle error
} }
......
...@@ -196,9 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -196,9 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
char *mode = NULL; char *mode = NULL;
if (tdFileOptions & TD_FILE_APPEND) { if (tdFileOptions & TD_FILE_APPEND) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+";
}else if (tdFileOptions & TD_FILE_TRUNC) { } else if (tdFileOptions & TD_FILE_TRUNC) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+";
}else { } else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb";
} else {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+";
} }
assert(!(tdFileOptions & TD_FILE_EXCL)); assert(!(tdFileOptions & TD_FILE_EXCL));
...@@ -635,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { ...@@ -635,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
} }
assert(pFile->fp != NULL); assert(pFile->fp != NULL);
char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0};
va_list ap; va_list ap;
va_start(ap, format); va_start(ap, format);
vfprintf(pFile->fp, format, ap); vfprintf(pFile->fp, format, ap);
...@@ -673,11 +675,11 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) { ...@@ -673,11 +675,11 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) {
size_t len = 0; size_t len = 0;
return getline(ptrBuf, &len, pFile->fp); return getline(ptrBuf, &len, pFile->fp);
} }
int32_t taosEOFFile(TdFilePtr pFile) { int32_t taosEOFFile(TdFilePtr pFile) {
if (pFile == NULL) { if (pFile == NULL) {
return 0; return 0;
} }
assert(pFile->fp != NULL); assert(pFile->fp != NULL);
return feof(pFile->fp); return feof(pFile->fp);
} }
\ No newline at end of file
...@@ -41,9 +41,11 @@ void *cancelHandler(void *arg) { ...@@ -41,9 +41,11 @@ void *cancelHandler(void *arg) {
taosReleaseRef(tscObjRef, rid); taosReleaseRef(tscObjRef, rid);
#endif #endif
#else #else
reset_terminal_mode();
printf("\nReceive ctrl+c or other signal, quit shell.\n"); printf("\nReceive ctrl+c or other signal, quit shell.\n");
exit(0); exit(0);
#endif #endif
reset_terminal_mode();
printf("\nReceive ctrl+c or other signal, quit shell.\n"); printf("\nReceive ctrl+c or other signal, quit shell.\n");
exit(0); exit(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册