提交 0f7829c0 编写于 作者: L Liu Jicong

fix multiple polling

上级 bc360c07
......@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
HEARTBEAT_TYPE_QUERY,
// types can be added here
//
HEARTBEAT_TYPE_MAX
......
......@@ -20,10 +20,10 @@
extern "C" {
#endif
#include "tcommon.h"
#include "parser.h"
#include "query.h"
#include "taos.h"
#include "tcommon.h"
#include "tdef.h"
#include "tep.h"
#include "thash.h"
......@@ -47,12 +47,12 @@ extern "C" {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct SHbConnInfo {
typedef struct {
void* param;
SClientHbReq* req;
} SHbConnInfo;
typedef struct SAppHbMgr {
typedef struct {
char* key;
// statistics
int32_t reportCnt;
......@@ -68,11 +68,11 @@ typedef struct SAppHbMgr {
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
typedef struct SClientHbMgr {
typedef struct {
int8_t inited;
// ctl
int8_t threadStop;
......@@ -108,13 +108,13 @@ typedef struct SHeartBeatInfo {
} SHeartBeatInfo;
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
int64_t clusterId;
void* pTransporter;
struct SAppHbMgr* pAppHbMgr;
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
int64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
};
typedef struct SAppInfo {
......@@ -141,10 +141,6 @@ typedef struct STscObj {
SAppInstInfo* pAppInfo;
} STscObj;
typedef struct SMqConsumer {
STscObj* pTscObj;
} SMqConsumer;
typedef struct SReqResultInfo {
const char* pRspMsg;
const char* pData;
......
......@@ -23,7 +23,7 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
......@@ -104,7 +104,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType);
......@@ -163,7 +163,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRs
return TSDB_CODE_SUCCESS;
}
static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (code != 0) {
tfree(param);
......@@ -226,7 +226,11 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
db->vgVersion = htonl(db->vgVersion);
}
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
SKv kv = {
.key = HEARTBEAT_KEY_DBINFO,
.valueLen = sizeof(SDbVgVersion) * dbNum,
.value = dbs,
};
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
......@@ -256,7 +260,11 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
stb->tversion = htons(stb->tversion);
}
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
SKv kv = {
.key = HEARTBEAT_KEY_STBINFO,
.valueLen = sizeof(SSTableMetaVersion) * stbNum,
.value = stbs,
};
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
......@@ -288,7 +296,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return TSDB_CODE_SUCCESS;
}
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {}
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
......@@ -396,7 +404,7 @@ static void *hbThreadFunc(void *param) {
free(buf);
break;
}
pInfo->fp = hbMqAsyncCallBack;
pInfo->fp = hbAsyncCallBack;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
......@@ -448,7 +456,6 @@ static void hbStopThread() {
}
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
/*return NULL;*/
hbMgrInit();
SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
......@@ -506,7 +513,6 @@ void appHbMgrCleanup(void) {
}
int hbMgrInit() {
/*return 0;*/
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
......@@ -524,7 +530,7 @@ int hbMgrInit() {
}
void hbMgrCleanUp() {
return;
#if 0
hbStopThread();
// destroy all appHbMgr
......@@ -537,6 +543,7 @@ void hbMgrCleanUp() {
pthread_mutex_unlock(&clientHbMgr.lock);
clientHbMgr.appHbMgrs = NULL;
#endif
}
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
......@@ -563,9 +570,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
/*return 0;*/
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0};
SClientHbKey connKey = {
.connId = connId,
.hbType = HEARTBEAT_TYPE_QUERY,
};
SHbConnInfo info = {0};
switch (hbType) {
case HEARTBEAT_TYPE_QUERY: {
......@@ -586,7 +595,6 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
}
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
/*return;*/
int32_t code = 0;
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
......@@ -598,7 +606,6 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
int32_t valueLen) {
return 0;
// find req by connection id
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
......
......@@ -225,9 +225,9 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
......@@ -239,12 +239,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
}
}
pRequest->code = res.code;
return pRequest->code;
}
......@@ -435,7 +435,11 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
SDataBuf buf = {
.len = pMsg->contLen,
.pData = NULL,
.handle = pMsg->handle,
};
if (pMsg->contLen > 0) {
buf.pData = calloc(1, pMsg->contLen);
......
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "tdef.h"
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "catalog.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
......@@ -81,13 +81,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
......@@ -103,6 +103,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
} else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) {
free(pMsgSendInfo);
return NULL;
}
......@@ -118,7 +119,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
}
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo;
}
......@@ -132,7 +135,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SShowRsp showRsp = {0};
tDeserializeSShowRsp(pMsg->pData, pMsg->len, &showRsp);
STableMetaRsp *pMetaMsg = &showRsp.tableMeta;
STableMetaRsp* pMetaMsg = &showRsp.tableMeta;
tfree(pRequest->body.resInfo.pRspMsg);
pRequest->body.resInfo.pRspMsg = pMsg->pData;
......@@ -158,7 +161,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if (pRequest->type == TDMT_VND_SHOW_TABLES) {
SShowReqInfo* pShowInfo = &pRequest->body.showInfo;
int32_t index = pShowInfo->currentIndex;
int32_t index = pShowInfo->currentIndex;
SVgroupInfo* pInfo = taosArrayGet(pShowInfo->pArray, index);
pShowInfo->vgId = pInfo->vgId;
}
......@@ -168,8 +171,8 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj *pRequest = param;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
SRequestObj* pRequest = param;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
tfree(pResInfo->pRspMsg);
if (code != TSDB_CODE_SUCCESS) {
......@@ -180,19 +183,19 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)pMsg->pData;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
pResInfo->pRspMsg = pMsg->pData;
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data;
pResInfo->pData = pRetrieve->data;
pResInfo->completed = pRetrieve->completed;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
tscDebug("0x%" PRIx64 " numOfRows:%d, complete:%d, qId:0x%" PRIx64, pRequest->self, pRetrieve->numOfRows,
pRetrieve->completed, pRequest->body.showInfo.execId);
tsem_post(&pRequest->body.rspSem);
......@@ -213,20 +216,20 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
pResInfo->pRspMsg = pMsg->pData;
pResInfo->pRspMsg = pMsg->pData;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
pFetchRsp->precision = htons(pFetchRsp->precision);
SVShowTablesFetchRsp* pFetchRsp = (SVShowTablesFetchRsp*)pMsg->pData;
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
pFetchRsp->precision = htons(pFetchRsp->precision);
pResInfo->pRspMsg = pMsg->pData;
pResInfo->pRspMsg = pMsg->pData;
pResInfo->numOfRows = pFetchRsp->numOfRows;
pResInfo->pData = pFetchRsp->data;
pResInfo->pData = pFetchRsp->data;
pResInfo->current = 0;
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
tscDebug("0x%" PRIx64 " numOfRows:%d, complete:%d, qId:0x%" PRIx64, pRequest->self, pFetchRsp->numOfRows,
pFetchRsp->completed, pRequest->body.showInfo.execId);
tsem_post(&pRequest->body.rspSem);
......@@ -254,7 +257,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
tFreeSUsedbRsp(&usedbRsp);
......@@ -377,14 +380,14 @@ void initMsgHandleFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
}
......@@ -28,11 +28,6 @@
#include "tqueue.h"
#include "tref.h"
static int64_t perfWrite;
static int64_t perfRead;
static int64_t perfRead2;
static int64_t perfRead3;
struct tmq_list_t {
int32_t cnt;
int32_t tot;
......@@ -64,6 +59,7 @@ struct tmq_t {
char groupId[256];
char clientId[256];
int8_t autoCommit;
int8_t inWaiting;
int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg;
......@@ -76,6 +72,7 @@ struct tmq_t {
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t
STaosQall* qall;
tsem_t rspSem;
// stat
int64_t pollCnt;
};
......@@ -214,9 +211,9 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
if (pParam->tmq->commit_cb) {
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
}
if (!pParam->async) tsem_post(&pParam->rspSem);
return 0;
......@@ -244,6 +241,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return NULL;
}
pTmq->pTscObj = (STscObj*)conn;
pTmq->inWaiting = 0;
pTmq->status = 0;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
......@@ -256,6 +254,8 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
......@@ -641,16 +641,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("msg discard\n");
if (pParam->epoch == tmq->epoch) {
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
return 0;
goto WRITE_QUEUE_FAIL;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
tsem_post(&tmq->rspSem);
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
return 0;
}
......@@ -660,6 +657,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
} else {
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
}
#if 0
if (pParam->sync == 1) {
/**pParam->msg = malloc(sizeof(tmq_message_t));*/
*pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
......@@ -669,45 +668,41 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
}
int64_t begin = clock();
taosWriteQitem(tmq->mqueue, *pParam->msg);
perfWrite += clock() - begin;
tsem_post(&pParam->rspSem);
return 0;
}
tsem_post(&pParam->rspSem);
return -1;
}
#endif
/*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
printf("fail\n");
return -1;
goto WRITE_QUEUE_FAIL;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosFreeQitem(pRsp);
return 0;
goto WRITE_QUEUE_FAIL;
}
pRsp->extra = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1);
/*printf("poll in queue\n");*/
/*pParam->rspMsg = (tmq_message_t*)pRsp;*/
/*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
/*printf("\n-----msg end------\n");*/
tsem_post(&tmq->rspSem);
return 0;
WRITE_QUEUE_FAIL:
if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
tsem_post(&tmq->rspSem);
return code;
}
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
......@@ -744,81 +739,94 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
return 0;
goto END;
}
tscDebug("tmq ask ep cb called");
// tmq's epoch is monotomically increase,
// so it's safe to discard any old epoch msg.
// epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) {
goto END;
}
if (pParam->sync) {
SMqRspHead* head = pMsg->pData;
SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) {
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
tsem_post(&pParam->rspSem);
tDeleteSMqCMGetSubEpRsp(&rsp);
} else {
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
code = -1;
goto END;
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp);
/*taosWriteQitem(tmq->mqueue, pRsp);*/
taosWriteQitem(tmq->mqueue, pRsp);
tsem_post(&tmq->rspSem);
}
return 0;
END:
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
return code;
}
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
SMqCMGetSubEpReq* req = malloc(tlen);
if (req == NULL) {
tscError("failed to malloc get subscribe ep buf");
goto END;
}
buf->consumerId = htobe64(tmq->consumerId);
buf->epoch = htonl(tmq->epoch);
strcpy(buf->cgroup, tmq->groupId);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) {
tscError("failed to malloc subscribe ep request");
goto END;
return -1;
}
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
req->consumerId = htobe64(tmq->consumerId);
req->epoch = htonl(tmq->epoch);
strcpy(req->cgroup, tmq->groupId);
SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
goto END;
free(req);
return -1;
}
pParam->tmq = tmq;
pParam->sync = sync;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
free(pParam);
free(req);
return -1;
}
sendInfo->msgInfo = (SDataBuf){
.pData = req,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->msgType = TDMT_MND_GET_SUB_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
END:
if (sync) tsem_wait(&pParam->rspSem);
return 0;
}
......@@ -942,19 +950,14 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
pVg->pollCnt++;
tmq->pollCnt++;
int64_t begin = clock();
tsem_wait(&param->rspSem);
perfRead3 += clock() - begin;
tmq_message_t* nmsg = NULL;
while (1) {
int64_t begin1 = clock();
taosReadQitem(tmq->mqueue, (void**)&nmsg);
perfRead2 += clock() - begin1;
if (nmsg == NULL) continue;
/*while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {*/
/*taosReadQitem(tmq->mqueue, (void**)&nmsg);*/
/*}*/
perfRead += clock() - begin;
while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
}
return nmsg;
}
}
......@@ -976,12 +979,14 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
tsem_post(&tmq->rspSem);
return -1;
}
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
tsem_post(&tmq->rspSem);
return -1;
}
param->tmq = tmq;
......@@ -1033,7 +1038,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
tmq_message_t* rspMsg = NULL;
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
break;
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) return NULL;
}
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
......@@ -1060,10 +1067,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
}
}
}
return NULL;
}
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
......@@ -1079,10 +1085,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
printf("perf write %f\n", (double)perfWrite / CLOCKS_PER_SEC);
printf("perf read %f\n", (double)perfRead / CLOCKS_PER_SEC);
printf("perf read2 %f\n", (double)perfRead2 / CLOCKS_PER_SEC);
printf("perf read3 %f\n", (double)perfRead3 / CLOCKS_PER_SEC);
return NULL;
}
} else
......@@ -1090,44 +1092,37 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
}
}
tmq_message_t* tmq_consumer_poll_v0(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg;
int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
#if 0
taosGetQitem(tmq->qall, (void**)&rspMsg);
if (rspMsg == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
}
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) {
return rspMsg;
}
#endif
while (1) {
/*printf("cycle\n");*/
if (atomic_load_32(&tmq->waitingRequest) == 0) {
tmqPollImpl(tmq, blocking_time);
}
while (atomic_load_32(&tmq->readyRequest) == 0) {
sched_yield();
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
return NULL;
}
}
}
taosReadAllQitems(tmq->mqueue, tmq->qall);
rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
tsem_wait(&tmq->rspSem);
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) {
return rspMsg;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
return NULL;
}
}
}
}
......@@ -1277,6 +1272,7 @@ const char* tmq_err2str(tmq_resp_err_t err) {
}
return "fail";
}
#if 0
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t));
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tmsg.h"
#include "query.h"
#include "tglobal.h"
#include "tsched.h"
#include "tmsg.h"
#include "trpc.h"
#include "tsched.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
static struct SSchema _s = {
.colId = TSDB_TBNAME_COLUMN_INDEX,
.type = TSDB_DATA_TYPE_BINARY,
.type = TSDB_DATA_TYPE_BINARY,
.bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
.name = "tbname",
};
const SSchema* tGetTbnameColumnSchema() {
return &_s;
}
const SSchema* tGetTbnameColumnSchema() { return &_s; }
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
int32_t rowLen = 0;
......@@ -87,7 +100,7 @@ int32_t initTaskQueue() {
double factor = 4.0;
int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == pTaskQueue) {
......@@ -96,19 +109,21 @@ int32_t initTaskQueue() {
}
qDebug("task queue is initialized, numOfThreads: %d", numOfThreads);
return 0;
}
int32_t cleanupTaskQueue() {
taosCleanUpScheduler(pTaskQueue);
return 0;
}
static void execHelper(struct SSchedMsg* pSchedMsg) {
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
__async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*) pSchedMsg->msg = code;
*(int32_t*)pSchedMsg->msg = code;
}
}
......@@ -116,34 +131,33 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
assert(execFn != NULL);
SSchedMsg schedMsg = {0};
schedMsg.fp = execHelper;
schedMsg.fp = execHelper;
schedMsg.ahandle = execFn;
schedMsg.thandle = execParam;
schedMsg.msg = code;
schedMsg.msg = code;
taosScheduleTask(pTaskQueue, &schedMsg);
return 0;
}
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
char *pMsg = rpcMallocCont(pInfo->msgInfo.len);
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) {
qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return terrno;
}
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
SRpcMsg rpcMsg = {
.msgType = pInfo->msgType,
.pCont = pMsg,
.contLen = pInfo->msgInfo.len,
.ahandle = (void*) pInfo,
.handle = pInfo->msgInfo.handle,
.code = 0
};
SRpcMsg rpcMsg = {.msgType = pInfo->msgType,
.pCont = pMsg,
.contLen = pInfo->msgInfo.len,
.ahandle = (void*)pInfo,
.handle = pInfo->msgInfo.handle,
.code = 0};
assert(pInfo->fp != NULL);
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册