diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b60f09bdf40e8e4694b2717bc023bffabc02aab5..ae594e90ca32184c65634999a67f9cfc07911be1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; typedef enum { HEARTBEAT_TYPE_MQ = 0, - HEARTBEAT_TYPE_QUERY = 1, + HEARTBEAT_TYPE_QUERY, // types can be added here // HEARTBEAT_TYPE_MAX diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 79909a569606758f9adc33a036881830fab6ffa7..ad018c68329c9f70c73bef06858b6da76edf1d6b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,10 +20,10 @@ extern "C" { #endif -#include "tcommon.h" #include "parser.h" #include "query.h" #include "taos.h" +#include "tcommon.h" #include "tdef.h" #include "tep.h" #include "thash.h" @@ -47,12 +47,12 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef struct SHbConnInfo { +typedef struct { void* param; SClientHbReq* req; } SHbConnInfo; -typedef struct SAppHbMgr { +typedef struct { char* key; // statistics int32_t reportCnt; @@ -68,11 +68,11 @@ typedef struct SAppHbMgr { SHashObj* connInfo; // hash } SAppHbMgr; -typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); +typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); -typedef struct SClientHbMgr { +typedef struct { int8_t inited; // ctl int8_t threadStop; @@ -108,13 +108,13 @@ typedef struct SHeartBeatInfo { } SHeartBeatInfo; struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; - SInstanceSummary summary; - SList* pConnList; // STscObj linked list - int64_t clusterId; - void* pTransporter; - struct SAppHbMgr* pAppHbMgr; + int64_t numOfConns; + SCorEpSet mgmtEp; + SInstanceSummary summary; + SList* pConnList; // STscObj linked list + int64_t clusterId; + void* pTransporter; + SAppHbMgr* pAppHbMgr; }; typedef struct SAppInfo { @@ -141,10 +141,6 @@ typedef struct STscObj { SAppInstInfo* pAppInfo; } STscObj; -typedef struct SMqConsumer { - STscObj* pTscObj; -} SMqConsumer; - typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e9e64a78b8f57e94ec81515a494bbf2b10970a94..354e82b622fb08e4d7d87f12d8e02ad0f9889ae9 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -23,7 +23,7 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } +static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -104,7 +104,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { +static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); @@ -163,7 +163,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { +static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); @@ -226,7 +226,11 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl db->vgVersion = htonl(db->vgVersion); } - SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + SKv kv = { + .key = HEARTBEAT_KEY_DBINFO, + .valueLen = sizeof(SDbVgVersion) * dbNum, + .value = dbs, + }; tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); @@ -256,7 +260,11 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC stb->tversion = htons(stb->tversion); } - SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; + SKv kv = { + .key = HEARTBEAT_KEY_STBINFO, + .valueLen = sizeof(SSTableMetaVersion) * stbNum, + .value = stbs, + }; tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen); @@ -288,7 +296,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -396,7 +404,7 @@ static void *hbThreadFunc(void *param) { free(buf); break; } - pInfo->fp = hbMqAsyncCallBack; + pInfo->fp = hbAsyncCallBack; pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; @@ -448,7 +456,6 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - /*return NULL;*/ hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -506,7 +513,6 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -524,7 +530,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; +#if 0 hbStopThread(); // destroy all appHbMgr @@ -537,6 +543,7 @@ void hbMgrCleanUp() { pthread_mutex_unlock(&clientHbMgr.lock); clientHbMgr.appHbMgrs = NULL; +#endif } int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { @@ -563,9 +570,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - /*return 0;*/ - SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SClientHbKey connKey = { + .connId = connId, + .hbType = HEARTBEAT_TYPE_QUERY, + }; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -586,7 +595,6 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -598,7 +606,6 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, int32_t valueLen) { - return 0; // find req by connection id SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3c6a49c3db25cbcc4a7b5f76d6081728a5a6f129..e9049ff751bf4eb23ec1f80a2d7f630a77f175a7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -225,9 +225,9 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { - void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; + void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.pQueryJob != NULL) { schedulerFreeJob(pRequest->body.pQueryJob); @@ -239,12 +239,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; - + if (pRequest->body.pQueryJob != NULL) { schedulerFreeJob(pRequest->body.pQueryJob); } } - + pRequest->code = res.code; return pRequest->code; } @@ -435,7 +435,11 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle}; + SDataBuf buf = { + .len = pMsg->contLen, + .pData = NULL, + .handle = pMsg->handle, + }; if (pMsg->contLen > 0) { buf.pData = calloc(1, pMsg->contLen); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 454fb7f1fea694ca61cc83af4458bd262a241567..4f18a08c95a31f45cc6e68ef82f4a9ab934fbf78 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ +#include "catalog.h" +#include "clientInt.h" +#include "clientLog.h" #include "os.h" #include "tdef.h" #include "tname.h" -#include "clientInt.h" -#include "clientLog.h" -#include "catalog.h" int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); @@ -81,13 +81,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { +SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->param = pRequest; - pMsgSendInfo->msgType = pRequest->type; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->param = pRequest; + pMsgSendInfo->msgType = pRequest->type; if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { @@ -103,6 +103,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { } else { SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); if (pFetchMsg == NULL) { + free(pMsgSendInfo); return NULL; } @@ -118,7 +119,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { pMsgSendInfo->msgInfo = pRequest->body.requestMsg; } - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; + pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL) + ? genericRspCallback + : handleRequestRspFp[TMSG_INDEX(pRequest->type)]; return pMsgSendInfo; } @@ -132,7 +135,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { SShowRsp showRsp = {0}; tDeserializeSShowRsp(pMsg->pData, pMsg->len, &showRsp); - STableMetaRsp *pMetaMsg = &showRsp.tableMeta; + STableMetaRsp* pMetaMsg = &showRsp.tableMeta; tfree(pRequest->body.resInfo.pRspMsg); pRequest->body.resInfo.pRspMsg = pMsg->pData; @@ -158,7 +161,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (pRequest->type == TDMT_VND_SHOW_TABLES) { SShowReqInfo* pShowInfo = &pRequest->body.showInfo; - int32_t index = pShowInfo->currentIndex; + int32_t index = pShowInfo->currentIndex; SVgroupInfo* pInfo = taosArrayGet(pShowInfo->pArray, index); pShowInfo->vgId = pInfo->vgId; } @@ -168,8 +171,8 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) { - SRequestObj *pRequest = param; - SReqResultInfo *pResInfo = &pRequest->body.resInfo; + SRequestObj* pRequest = param; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; tfree(pResInfo->pRspMsg); if (code != TSDB_CODE_SUCCESS) { @@ -180,19 +183,19 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) assert(pMsg->len >= sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData; - pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); - pRetrieve->precision = htons(pRetrieve->precision); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)pMsg->pData; + pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); + pRetrieve->precision = htons(pRetrieve->precision); - pResInfo->pRspMsg = pMsg->pData; + pResInfo->pRspMsg = pMsg->pData; pResInfo->numOfRows = pRetrieve->numOfRows; - pResInfo->pData = pRetrieve->data; + pResInfo->pData = pRetrieve->data; pResInfo->completed = pRetrieve->completed; pResInfo->current = 0; setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); - tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, + tscDebug("0x%" PRIx64 " numOfRows:%d, complete:%d, qId:0x%" PRIx64, pRequest->self, pRetrieve->numOfRows, pRetrieve->completed, pRequest->body.showInfo.execId); tsem_post(&pRequest->body.rspSem); @@ -213,20 +216,20 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg->len >= sizeof(SRetrieveTableRsp)); - pResInfo->pRspMsg = pMsg->pData; + pResInfo->pRspMsg = pMsg->pData; - SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData; - pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows); - pFetchRsp->precision = htons(pFetchRsp->precision); + SVShowTablesFetchRsp* pFetchRsp = (SVShowTablesFetchRsp*)pMsg->pData; + pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows); + pFetchRsp->precision = htons(pFetchRsp->precision); - pResInfo->pRspMsg = pMsg->pData; + pResInfo->pRspMsg = pMsg->pData; pResInfo->numOfRows = pFetchRsp->numOfRows; - pResInfo->pData = pFetchRsp->data; + pResInfo->pData = pFetchRsp->data; pResInfo->current = 0; setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); - tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows, + tscDebug("0x%" PRIx64 " numOfRows:%d, complete:%d, qId:0x%" PRIx64, pRequest->self, pFetchRsp->numOfRows, pFetchRsp->completed, pRequest->body.showInfo.execId); tsem_post(&pRequest->body.rspSem); @@ -254,7 +257,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); SName name = {0}; - tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB); + tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); tFreeSUsedbRsp(&usedbRsp); @@ -377,14 +380,14 @@ void initMsgHandleFp() { tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp; #endif - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp; + handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0e84371199d62ae2050dfa813413cc0c7d4d18a4..8aca3d6f6609b4728a0b6ab6c5a13e86cea7708d 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -28,11 +28,6 @@ #include "tqueue.h" #include "tref.h" -static int64_t perfWrite; -static int64_t perfRead; -static int64_t perfRead2; -static int64_t perfRead3; - struct tmq_list_t { int32_t cnt; int32_t tot; @@ -64,6 +59,7 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; + int8_t inWaiting; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; @@ -76,6 +72,7 @@ struct tmq_t { SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; + tsem_t rspSem; // stat int64_t pollCnt; }; @@ -214,9 +211,9 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; if (pParam->tmq->commit_cb) { - pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); + pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL); } if (!pParam->async) tsem_post(&pParam->rspSem); return 0; @@ -244,6 +241,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return NULL; } pTmq->pTscObj = (STscObj*)conn; + pTmq->inWaiting = 0; pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; @@ -256,6 +254,8 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; + tsem_init(&pTmq->rspSem, 0, 0); + pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); @@ -641,16 +641,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); - if (pParam->epoch == tmq->epoch) { - atomic_sub_fetch_32(&tmq->waitingRequest, 1); - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } - return 0; + goto WRITE_QUEUE_FAIL; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { + tsem_post(&tmq->rspSem); printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } @@ -660,6 +657,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } else { atomic_sub_fetch_32(&tmq->waitingRequest, 1); } + +#if 0 if (pParam->sync == 1) { /**pParam->msg = malloc(sizeof(tmq_message_t));*/ *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); @@ -669,45 +668,41 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; } - int64_t begin = clock(); taosWriteQitem(tmq->mqueue, *pParam->msg); - perfWrite += clock() - begin; tsem_post(&pParam->rspSem); return 0; } tsem_post(&pParam->rspSem); return -1; } +#endif /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { - printf("fail\n"); - return -1; + goto WRITE_QUEUE_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { /*printf("no data\n");*/ - if (pParam->epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } taosFreeQitem(pRsp); - return 0; + goto WRITE_QUEUE_FAIL; } + pRsp->extra = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); atomic_add_fetch_32(&tmq->readyRequest, 1); - - /*printf("poll in queue\n");*/ - /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ - /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ - - /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ - /*printf("-----msg begin----\n");*/ - /*printf("\n-----msg end------\n");*/ + tsem_post(&tmq->rspSem); return 0; + +WRITE_QUEUE_FAIL: + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + tsem_post(&tmq->rspSem); + return code; } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { @@ -744,81 +739,94 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tmq_t* tmq = pParam->tmq; if (code != 0) { printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); - if (pParam->sync) { - tsem_post(&pParam->rspSem); - } - return 0; + goto END; } - tscDebug("tmq ask ep cb called"); + + // tmq's epoch is monotomically increase, + // so it's safe to discard any old epoch msg. + // epoch will only increase when received newer epoch ep msg + SMqRspHead* head = pMsg->pData; + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch <= epoch) { + goto END; + } + if (pParam->sync) { - SMqRspHead* head = pMsg->pData; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - int32_t epoch = atomic_load_32(&tmq->epoch); - if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + if (tmqUpdateEp(tmq, head->epoch, &rsp)) { atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tsem_post(&pParam->rspSem); tDeleteSMqCMGetSubEpRsp(&rsp); } else { tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto END; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); - /*taosWriteQitem(tmq->mqueue, pRsp);*/ + taosWriteQitem(tmq->mqueue, pRsp); + tsem_post(&tmq->rspSem); } - return 0; + +END: + if (pParam->sync) { + tsem_post(&pParam->rspSem); + } + return code; } int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tlen = sizeof(SMqCMGetSubEpReq); - SMqCMGetSubEpReq* buf = malloc(tlen); - if (buf == NULL) { + SMqCMGetSubEpReq* req = malloc(tlen); + if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); - goto END; - } - buf->consumerId = htobe64(tmq->consumerId); - buf->epoch = htonl(tmq->epoch); - strcpy(buf->cgroup, tmq->groupId); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); - if (pRequest == NULL) { - tscError("failed to malloc subscribe ep request"); - goto END; + return -1; } - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; + req->consumerId = htobe64(tmq->consumerId); + req->epoch = htonl(tmq->epoch); + strcpy(req->cgroup, tmq->groupId); SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); - goto END; + free(req); + return -1; } pParam->tmq = tmq; pParam->sync = sync; tsem_init(&pParam->rspSem, 0, 0); - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + tsem_destroy(&pParam->rspSem); + free(pParam); + free(req); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = req, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqAskEpCb; + sendInfo->msgType = TDMT_MND_GET_SUB_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); -END: if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -942,19 +950,14 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { pVg->pollCnt++; tmq->pollCnt++; - int64_t begin = clock(); tsem_wait(¶m->rspSem); - perfRead3 += clock() - begin; tmq_message_t* nmsg = NULL; while (1) { - int64_t begin1 = clock(); taosReadQitem(tmq->mqueue, (void**)&nmsg); - perfRead2 += clock() - begin1; if (nmsg == NULL) continue; - /*while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {*/ - /*taosReadQitem(tmq->mqueue, (void**)&nmsg);*/ - /*}*/ - perfRead += clock() - begin; + while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) { + taosReadQitem(tmq->mqueue, (void**)&nmsg); + } return nmsg; } } @@ -976,12 +979,14 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); if (param == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } param->tmq = tmq; @@ -1033,7 +1038,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese tmq_message_t* rspMsg = NULL; taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { - break; + taosReadAllQitems(tmq->mqueue, tmq->qall); + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) return NULL; } if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1060,10 +1067,9 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } } } - return NULL; } -tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { +tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); @@ -1079,10 +1085,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { - printf("perf write %f\n", (double)perfWrite / CLOCKS_PER_SEC); - printf("perf read %f\n", (double)perfRead / CLOCKS_PER_SEC); - printf("perf read2 %f\n", (double)perfRead2 / CLOCKS_PER_SEC); - printf("perf read3 %f\n", (double)perfRead3 / CLOCKS_PER_SEC); return NULL; } } else @@ -1090,44 +1092,37 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } } -tmq_message_t* tmq_consumer_poll_v0(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* rspMsg = NULL; +tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg; int64_t startTime = taosGetTimestampMs(); // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); -#if 0 - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); - } rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); if (rspMsg) { return rspMsg; } -#endif while (1) { /*printf("cycle\n");*/ if (atomic_load_32(&tmq->waitingRequest) == 0) { tmqPollImpl(tmq, blocking_time); } - while (atomic_load_32(&tmq->readyRequest) == 0) { - sched_yield(); - if (blocking_time != 0) { - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - return NULL; - } - } - } - taosReadAllQitems(tmq->mqueue, tmq->qall); - rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + + tsem_wait(&tmq->rspSem); + + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); if (rspMsg) { return rspMsg; } + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } } } @@ -1277,6 +1272,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { } return "fail"; } + #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index a2165453d55820078292eedb7085f7877f459092..63fbf59c064a91a883a0ae52ad7fb9e1c56b1c55 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -1,23 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + #include "os.h" -#include "tmsg.h" #include "query.h" #include "tglobal.h" -#include "tsched.h" +#include "tmsg.h" #include "trpc.h" +#include "tsched.h" -#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) -#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) +#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) +#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) static struct SSchema _s = { .colId = TSDB_TBNAME_COLUMN_INDEX, - .type = TSDB_DATA_TYPE_BINARY, + .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .name = "tbname", }; -const SSchema* tGetTbnameColumnSchema() { - return &_s; -} +const SSchema* tGetTbnameColumnSchema() { return &_s; } static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { int32_t rowLen = 0; @@ -87,7 +100,7 @@ int32_t initTaskQueue() { double factor = 4.0; int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); - + int32_t queueSize = tsMaxConnections * 2; pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); if (NULL == pTaskQueue) { @@ -96,19 +109,21 @@ int32_t initTaskQueue() { } qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); + return 0; } int32_t cleanupTaskQueue() { taosCleanUpScheduler(pTaskQueue); + return 0; } static void execHelper(struct SSchedMsg* pSchedMsg) { assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); - __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + int32_t code = execFn(pSchedMsg->thandle); if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*) pSchedMsg->msg = code; + *(int32_t*)pSchedMsg->msg = code; } } @@ -116,34 +131,33 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) assert(execFn != NULL); SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; + schedMsg.fp = execHelper; schedMsg.ahandle = execFn; schedMsg.thandle = execParam; - schedMsg.msg = code; + schedMsg.msg = code; taosScheduleTask(pTaskQueue, &schedMsg); + return 0; } -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { - char *pMsg = rpcMallocCont(pInfo->msgInfo.len); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { - qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); + qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return terrno; } memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); - SRpcMsg rpcMsg = { - .msgType = pInfo->msgType, - .pCont = pMsg, - .contLen = pInfo->msgInfo.len, - .ahandle = (void*) pInfo, - .handle = pInfo->msgInfo.handle, - .code = 0 - }; + SRpcMsg rpcMsg = {.msgType = pInfo->msgType, + .pCont = pMsg, + .contLen = pInfo->msgInfo.len, + .ahandle = (void*)pInfo, + .handle = pInfo->msgInfo.handle, + .code = 0}; assert(pInfo->fp != NULL); rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +}