From 32e988766478b1caea3dab9883899860805a1c03 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 22 Apr 2022 19:47:00 +0800 Subject: [PATCH] feat(tmq): add push mode --- example/src/tmq.c | 2 - include/client/taos.h | 2 + include/os/osSemaphore.h | 45 +++--- include/util/tcache.h | 16 +- source/client/src/tmq.c | 12 +- source/dnode/mnode/impl/src/mndPerfSchema.c | 2 + source/dnode/mnode/impl/src/mndProfile.c | 65 ++++---- source/dnode/mnode/impl/src/mndShow.c | 2 +- source/dnode/mnode/impl/src/mndTopic.c | 4 +- source/dnode/vnode/src/inc/tq.h | 39 +++-- source/dnode/vnode/src/tq/tq.c | 167 +++++++++++++++++++- source/os/src/osSemaphore.c | 78 ++++++--- source/util/src/tcache.c | 150 +++++++++--------- source/util/test/cacheTest.cpp | 86 +++++----- 14 files changed, 444 insertions(+), 226 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 46f57799e7..2abf915fd8 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -101,8 +101,6 @@ int32_t create_topic() { } taos_free_result(pRes); - /*const char* sql = "select * from tu1";*/ - /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { diff --git a/include/client/taos.h b/include/client/taos.h index 3d139ce6d2..30e99e202c 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -245,6 +245,8 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *); +DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); +DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); #if 0 DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 594daf1bf3..21c88c9976 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -22,21 +22,28 @@ extern "C" { #include -#if defined (_TD_DARWIN_64) - typedef struct tsem_s *tsem_t; - int tsem_init(tsem_t *sem, int pshared, unsigned int value); - int tsem_wait(tsem_t *sem); - int tsem_post(tsem_t *sem); - int tsem_destroy(tsem_t *sem); +#if defined(_TD_DARWIN_64) + +typedef struct tsem_s *tsem_t; + +int tsem_init(tsem_t *sem, int pshared, unsigned int value); +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t nanosecs); +int tsem_post(tsem_t *sem); +int tsem_destroy(tsem_t *sem); + #else - #define tsem_t sem_t - #define tsem_init sem_init - int tsem_wait(tsem_t* sem); - #define tsem_post sem_post - #define tsem_destroy sem_destroy + +#define tsem_t sem_t +#define tsem_init sem_init +int tsem_wait(tsem_t *sem); +int tsem_timewait(tsem_t *sim, int64_t nanosecs); +#define tsem_post sem_post +#define tsem_destroy sem_destroy + #endif -#if defined (_TD_DARWIN_64) +#if defined(_TD_DARWIN_64) // #define TdThreadRwlock TdThreadMutex // #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL) // #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock) @@ -44,20 +51,20 @@ extern "C" { // #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock) // #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock) - #define TdThreadSpinlock TdThreadMutex - #define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL) - #define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock) - #define taosThreadSpinLock(lock) taosThreadMutexLock(lock) - #define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock) +#define TdThreadSpinlock TdThreadMutex +#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL) +#define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock) +#define taosThreadSpinLock(lock) taosThreadMutexLock(lock) +#define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock) #endif bool taosCheckPthreadValid(TdThread thread); int64_t taosGetSelfPthreadId(); int64_t taosGetPthreadId(TdThread thread); -void taosResetPthread(TdThread* thread); +void taosResetPthread(TdThread *thread); bool taosComparePthread(TdThread first, TdThread second); int32_t taosGetPId(); -int32_t taosGetAppName(char* name, int32_t* len); +int32_t taosGetAppName(char *name, int32_t *len); #ifdef __cplusplus } diff --git a/include/util/tcache.h b/include/util/tcache.h index b5c1578380..d8ab018570 100644 --- a/include/util/tcache.h +++ b/include/util/tcache.h @@ -47,13 +47,13 @@ typedef struct STrashElem STrashElem; /** * initialize the cache object * @param keyType key type - * @param refreshTimeInSeconds refresh operation interval time, the maximum survival time when one element is expired + * @param refreshTimeInMs refresh operation interval time, the maximum survival time when one element is expired * and not referenced by other objects * @param extendLifespan auto extend lifespan, if accessed * @param fn free resource callback function * @return */ -SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName); /** @@ -111,7 +111,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); * @param pCacheObj * @return */ -size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj); +size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj); /** * move all data node into trash, clear node in trash can if it is not referenced by any clients @@ -145,11 +145,11 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1); */ void taosStopCacheRefreshWorker(); -SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj); -bool taosCacheIterNext(SCacheIter* pIter); -void* taosCacheIterGetData(const SCacheIter* pIter, size_t* dataLen); -void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* keyLen); -void taosCacheDestroyIter(SCacheIter* pIter); +SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj); +bool taosCacheIterNext(SCacheIter *pIter); +void *taosCacheIterGetData(const SCacheIter *pIter, size_t *dataLen); +void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *keyLen); +void taosCacheDestroyIter(SCacheIter *pIter); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 51e422f5c2..4dc0e6e5cf 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -260,6 +260,16 @@ void tmq_list_destroy(tmq_list_t* list) { taosArrayDestroy(container); } +int32_t tmq_list_get_size(const tmq_list_t* list) { + const SArray* container = &list->container; + return taosArrayGetSize(container); +} + +char** tmq_list_to_c_array(const tmq_list_t* list) { + const SArray* container = &list->container; + return container->pData; +} + static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } @@ -387,7 +397,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; - pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); + pTmq->consumerId = tGenIdPI64(); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); if (pTmq->clientTopics == NULL) { taosMemoryFree(pTmq); diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index fe96bcb709..737068a2dd 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -49,6 +49,7 @@ static const SPerfsTableSchema topicSchema[] = { static const SPerfsTableSchema consumerSchema[] = { {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, @@ -61,6 +62,7 @@ static const SPerfsTableSchema subscribeSchema[] = { {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, }; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 826a73afc6..fef57196f0 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -24,20 +24,20 @@ #include "version.h" typedef struct { - uint32_t id; - int8_t connType; - char user[TSDB_USER_LEN]; - char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc - int64_t appStartTimeMs; // app start time - int32_t pid; // pid of app that invokes taosc - uint32_t ip; - uint16_t port; - int8_t killed; - int64_t loginTimeMs; - int64_t lastAccessTimeMs; - uint64_t killId; - int32_t numOfQueries; - SArray *pQueries; //SArray + uint32_t id; + int8_t connType; + char user[TSDB_USER_LEN]; + char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc + int64_t appStartTimeMs; // app start time + int32_t pid; // pid of app that invokes taosc + uint32_t ip; + uint16_t port; + int8_t killed; + int64_t loginTimeMs; + int64_t lastAccessTimeMs; + uint64_t killId; + int32_t numOfQueries; + SArray *pQueries; // SArray } SConnObj; static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, @@ -58,7 +58,8 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); int32_t mndInitProfile(SMnode *pMnode) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - int32_t connCheckTime = tsShellActivityTimer * 2; + // in ms + int32_t connCheckTime = tsShellActivityTimer * 2 * 1000; pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn"); if (pMgmt->cache == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -71,9 +72,9 @@ int32_t mndInitProfile(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); -// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); + // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); -// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); + // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); return 0; @@ -91,7 +92,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType int32_t pid, const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - char connStr[255] = {0}; + char connStr[255] = {0}; int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); int32_t connId = mndGenerateUid(connStr, len); if (startTime == 0) startTime = taosGetTimestampMs(); @@ -253,8 +254,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { pConn->pQueries = pBasic->queryDesc; pBasic->queryDesc = NULL; - - pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; + + pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; return TSDB_CODE_SUCCESS; } @@ -324,9 +325,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { return NULL; } -static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { +static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, + SClientHbBatchRsp *pBatchRsp) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; if (pHbReq->query) { SQueryHbReqBasic *pBasic = pHbReq->query; @@ -335,8 +337,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb rpcGetConnInfo(pMsg->handle, &connInfo); SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); - if (pConn == NULL) { - pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); + if (pConn == NULL) { + pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, + pBasic->pid, pBasic->app, 0); if (pConn == NULL) { mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); return -1; @@ -345,7 +348,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } } else if (pConn->killed) { mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); - mndReleaseConn(pMnode, pConn); + mndReleaseConn(pMnode, pConn); terrno = TSDB_CODE_MND_INVALID_CONNECTION; return -1; } @@ -369,8 +372,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } rspBasic->connId = pConn->id; - rspBasic->totalDnodes = 1; //TODO - rspBasic->onlineDnodes = 1; //TODO + rspBasic->totalDnodes = 1; // TODO + rspBasic->onlineDnodes = 1; // TODO mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndReleaseConn(pMnode, pConn); @@ -379,7 +382,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb int32_t kvNum = taosHashGetSize(pHbReq->info); if (NULL == pHbReq->info || kvNum <= 0) { - taosArrayPush(pBatchRsp->rsps, &hbRsp); + taosArrayPush(pBatchRsp->rsps, &hbRsp); return TSDB_CODE_SUCCESS; } @@ -604,8 +607,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int } static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pReq->pNode; - int32_t numOfRows = 0; + SMnode *pMnode = pReq->pNode; + int32_t numOfRows = 0; #if 0 SConnObj *pConn = NULL; int32_t cols = 0; @@ -703,7 +706,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i } pShow->numOfRows += numOfRows; -#endif +#endif return numOfRows; } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 1a14c94640..804fa2e17f 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -28,7 +28,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; - pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show"); + pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5000, true, (__cache_free_fn_t)mndFreeShowObj, "show"); if (pMgmt->cache == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to alloc show cache since %s", terrstr()); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index f17b8a4d88..a4b98ba01a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -534,8 +534,8 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE); - strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql); + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN); varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); colDataAppend(pColInfo, numOfRows, (const char *)sql, false); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6f929ce829..2250aab35c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -18,8 +18,10 @@ #include "executor.h" #include "os.h" +#include "tcache.h" #include "thash.h" #include "tmsg.h" +#include "trpc.h" #include "ttimer.h" #include "wal.h" @@ -142,26 +144,37 @@ typedef struct { } STqMetaStore; typedef struct { - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - int64_t consumerId; - int32_t epoch; - int8_t subType; - int8_t withTbName; - int8_t withSchema; - int8_t withTag; - int8_t withTagSchema; - char* qmsg; + int64_t consumerId; + int32_t epoch; + int32_t skipLogNum; + int64_t reqOffset; + SRWLatch lock; + SRpcMsg* handle; +} STqPushHandle; + +typedef struct { + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int64_t consumerId; + int32_t epoch; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; + char* qmsg; + STqPushHandle pushHandle; // SRWLatch lock; SWalReadHandle* pWalReader; - // number should be identical to fetch thread num - STqReadHandle* pStreamReader[4]; - qTaskInfo_t task[4]; + // task number should be the same with fetch thread + STqReadHandle* pExecReader[5]; + qTaskInfo_t task[5]; } STqExec; struct STQ { char* path; // STqMetaStore* tqMeta; - SHashObj* execs; // subKey -> tqExec + SHashObj* pushMgr; // consumerId -> STqExec* + SHashObj* execs; // subKey -> STqExec SHashObj* pStreamTasks; SVnode* pVnode; SWal* pWal; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30f75218b4..aa04ae3b46 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -41,6 +41,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + return pTq; } @@ -52,8 +54,139 @@ void tqClose(STQ* pTq) { // TODO } +int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + if (msgType != TDMT_VND_SUBMIT) return 0; + void* pIter = NULL; + STqExec* pExec = NULL; + SSubmitReq* pReq = (SSubmitReq*)msg; + int32_t workerId = 4; + int64_t fetchOffset = ver; + + while (1) { + pIter = taosHashIterate(pTq->pushMgr, pIter); + if (pIter == NULL) break; + pExec = (STqExec**)pIter; + + taosWLockLatch(&pExec->pushHandle.lock); + + SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle); + ASSERT(pMsg); + + SMqDataBlkRsp rsp = {0}; + rsp.reqOffset = pExec->pushHandle.reqOffset; + rsp.blockData = taosArrayInit(0, sizeof(int32_t)); + rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); + + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + qTaskInfo_t task = pExec->task[workerId]; + ASSERT(task); + qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + if (pDataBlock == NULL) break; + + ASSERT(pDataBlock->info.rows != 0); + ASSERT(pDataBlock->info.numOfCols != 0); + + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = ts; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pDataBlock->info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; + } + } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + STqReadHandle* pReader = pExec->pExecReader[workerId]; + tqReadHandleSetMsg(pReader, pReq, 0); + while (tqNextDataBlock(pReader)) { + SSDataBlock block = {0}; + if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows, + &block.info.numOfCols) < 0) { + ASSERT(0); + } + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + /*pRetrieve->useconds = 0;*/ + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(block.info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; + } + } else { + ASSERT(0); + } + + if (rsp.blockNum == 0) { + taosWUnLockLatch(&pExec->pushHandle.lock); + continue; + } + + ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); + ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); + + rsp.rspOffset = fetchOffset; + + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch; + ((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqDataBlkRsp(&abuf, &rsp); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + tmsgSendRsp(pMsg); + + atomic_store_ptr(&pExec->pushHandle.handle, NULL); + taosWUnLockLatch(&pExec->pushHandle.lock); + + vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum, + rsp.reqOffset, rsp.rspOffset); + + // TODO destroy + taosArrayDestroy(rsp.blockData); + taosArrayDestroy(rsp.blockDataLen); + } + + return 0; +} + int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) { if (msgType != TDMT_VND_SUBMIT) return 0; + void* data = taosMemoryMalloc(msgLen); if (data == NULL) { return -1; @@ -71,6 +204,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi .pCont = data, .contLen = msgLen, }; + tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); #if 0 @@ -240,6 +374,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReqV2* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; + int64_t waitTime = pReq->blockingTime; int32_t reqEpoch = pReq->epoch; int64_t fetchOffset; @@ -265,8 +400,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockData = taosArrayInit(0, sizeof(void*)); + rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); @@ -283,6 +418,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); + +#if 0 + // add to pushMgr + taosWLockLatch(&pExec->pushHandle.lock); + + pExec->pushHandle.consumerId = consumerId; + pExec->pushHandle.epoch = reqEpoch; + pExec->pushHandle.reqOffset = rsp.reqOffset; + pExec->pushHandle.skipLogNum = rsp.skipLogNum; + pExec->pushHandle.handle = pMsg; + + taosWUnLockLatch(&pExec->pushHandle.lock); + + // TODO add timer + + // TODO: the pointer will always be valid? + taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*)); + taosArrayDestroy(rsp.blockData); + taosArrayDestroy(rsp.blockDataLen); + return 0; +#endif + break; } @@ -325,7 +482,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rsp.blockNum++; } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { - STqReadHandle* pReader = pExec->pStreamReader[workerId]; + STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); while (tqNextDataBlock(pReader)) { SSDataBlock block = {0}; @@ -635,10 +792,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { req.qmsg = NULL; pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); - for (int32_t i = 0; i < 4; i++) { - pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + for (int32_t i = 0; i < 5; i++) { + pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pExec->pStreamReader[i], + .reader = pExec->pExecReader[i], .meta = pTq->pVnode->pMeta, }; pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 1297fdbc27..893cf71a6f 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -85,11 +85,11 @@ int32_t tsem_wait(tsem_t* sem) { #include #include -static TdThread sem_thread; +static TdThread sem_thread; static TdThreadOnce sem_once; -static task_t sem_port; -static volatile int sem_inited = 0; -static semaphore_t sem_exit; +static task_t sem_port; +static volatile int sem_inited = 0; +static semaphore_t sem_exit; static void *sem_thread_routine(void *arg) { (void)arg; @@ -122,12 +122,12 @@ static void once_init(void) { struct tsem_s { #ifdef SEM_USE_PTHREAD - TdThreadMutex lock; - TdThreadCond cond; + TdThreadMutex lock; + TdThreadCond cond; volatile int64_t val; #elif defined(SEM_USE_POSIX) size_t id; - sem_t * sem; + sem_t *sem; #elif defined(SEM_USE_SEM) semaphore_t sem; #else // SEM_USE_PTHREAD @@ -140,7 +140,8 @@ struct tsem_s { int tsem_init(tsem_t *sem, int pshared, unsigned int value) { // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); if (*sem) { - fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p)); @@ -180,20 +181,22 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { int e = errno; if (e == EEXIST) continue; if (e == EINTR) continue; - fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, - strerror(e)); + fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, + e, strerror(e)); abort(); } while (p->sem == SEM_FAILED); #elif defined(SEM_USE_SEM) taosThreadOnce(&sem_once, once_init); if (sem_inited != 1) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, + __func__, sem); errno = ENOMEM; return -1; } kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); if (ret != KERN_SUCCESS) { - fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); // we fail-fast here, because we have less-doc about semaphore_create for the moment abort(); } @@ -224,18 +227,21 @@ int tsem_wait(tsem_t *sem) { } #ifdef SEM_USE_PTHREAD if (taosThreadMutexLock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } p->val -= 1; if (p->val < 0) { if (taosThreadCondWait(&p->cond, &p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } } if (taosThreadMutexUnlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } return 0; @@ -260,18 +266,21 @@ int tsem_post(tsem_t *sem) { } #ifdef SEM_USE_PTHREAD if (taosThreadMutexLock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } p->val += 1; if (p->val <= 0) { if (taosThreadCondSignal(&p->cond)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } } if (taosThreadMutexUnlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } return 0; @@ -293,26 +302,30 @@ int tsem_destroy(tsem_t *sem) { } struct tsem_s *p = *sem; if (!p->valid) { - // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); - // abort(); + // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + // sem); abort(); return 0; } #ifdef SEM_USE_PTHREAD if (taosThreadMutexLock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } p->valid = 0; if (taosThreadCondDestroy(&p->cond)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } if (taosThreadMutexUnlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } if (taosThreadMutexDestroy(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, + sem); abort(); } #elif defined(SEM_USE_POSIX) @@ -321,8 +334,8 @@ int tsem_destroy(tsem_t *sem) { int r = sem_unlink(name); if (r) { int e = errno; - fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, - strerror(e)); + fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, + e, strerror(e)); abort(); } #elif defined(SEM_USE_SEM) @@ -424,4 +437,17 @@ int32_t tsem_wait(tsem_t* sem) { return ret; } +int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { + int ret = 0; + + struct timespec tv = { + .tv_sec = 0, + .tv_nsec = nanosecs, + }; + + while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue; + + return ret; +} + #endif diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index a69f11f285..e2dfde0503 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -19,16 +19,16 @@ #include "tlog.h" #include "tutil.h" -#define CACHE_MAX_CAPACITY 1024*1024*16 -#define CACHE_DEFAULT_CAPACITY 1024*4 +#define CACHE_MAX_CAPACITY 1024 * 1024 * 16 +#define CACHE_DEFAULT_CAPACITY 1024 * 4 -static TdThread cacheRefreshWorker = {0}; +static TdThread cacheRefreshWorker = {0}; static TdThreadOnce cacheThreadInit = PTHREAD_ONCE_INIT; static TdThreadMutex guard = PTHREAD_MUTEX_INITIALIZER; -static SArray *pCacheArrayList = NULL; -static bool stopRefreshWorker = false; -static bool refreshWorkerNormalStopped = false; -static bool refreshWorkerUnexpectedStopped = false; +static SArray *pCacheArrayList = NULL; +static bool stopRefreshWorker = false; +static bool refreshWorkerNormalStopped = false; +static bool refreshWorkerUnexpectedStopped = false; typedef struct SCacheNode { uint64_t addedTime; // the added time when this element is added or updated into cache @@ -36,7 +36,7 @@ typedef struct SCacheNode { int64_t expireTime; // expire time uint64_t signature; struct STrashElem *pTNodeHeader; // point to trash node head - uint16_t keyLen: 15; // max key size: 32kb + uint16_t keyLen : 15; // max key size: 32kb bool inTrashcan : 1; // denote if it is in trash or not uint32_t size; // allocated size for current SCacheNode uint32_t dataLen; @@ -47,8 +47,8 @@ typedef struct SCacheNode { } SCacheNode; typedef struct SCacheEntry { - int32_t num; // number of elements in current entry - SRWLatch latch; // entry latch + int32_t num; // number of elements in current entry + SRWLatch latch; // entry latch SCacheNode *next; } SCacheEntry; @@ -75,24 +75,24 @@ typedef struct SCacheIter { * when the node in pTrash does not be referenced, it will be release at the expired expiredTime */ struct SCacheObj { - int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included. - int64_t refreshTime; - char *name; - SCacheStatis statistics; - - SCacheEntry *pEntryList; - size_t capacity; // number of slots - size_t numOfElems; // number of elements in cache - _hash_fn_t hashFp; // hash function - __cache_free_fn_t freeFp; - - uint32_t numOfElemsInTrash; // number of element in trash - STrashElem *pTrash; - - uint8_t deleting; // set the deleting flag to stop refreshing ASAP. - TdThread refreshWorker; - bool extendLifespan; // auto extend life span when one item is accessed. - int64_t checkTick; // tick used to record the check times of the refresh threads + int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; + char *name; + SCacheStatis statistics; + + SCacheEntry *pEntryList; + size_t capacity; // number of slots + size_t numOfElems; // number of elements in cache + _hash_fn_t hashFp; // hash function + __cache_free_fn_t freeFp; + + uint32_t numOfElemsInTrash; // number of element in trash + STrashElem *pTrash; + + uint8_t deleting; // set the deleting flag to stop refreshing ASAP. + TdThread refreshWorker; + bool extendLifespan; // auto extend life span when one item is accessed. + int64_t checkTick; // tick used to record the check times of the refresh threads #if defined(LINUX) TdThreadRwlock lock; #else @@ -182,7 +182,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) { * @return SCacheNode */ static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, - uint64_t duration); + uint64_t duration); /** * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash @@ -267,7 +267,7 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) { pEntry->num += 1; } -static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) { +static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) { if (prev == NULL) { ASSERT(pe->next == pNode); pe->next = pNode->pNext; @@ -279,14 +279,14 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pe->num -= 1; } -static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) { +static FORCE_INLINE SCacheEntry *doFindEntry(SCacheObj *pCacheObj, const void *key, size_t keyLen) { uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen); int32_t slot = hashVal % pCacheObj->capacity; return &pCacheObj->pEntryList[slot]; } -static FORCE_INLINE SCacheNode * -doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) { +static FORCE_INLINE SCacheNode *doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, + SCacheNode **prev) { SCacheNode *pNode = pe->next; while (pNode) { if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) { @@ -299,9 +299,9 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode* return pNode; } -static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { +static bool doRemoveExpiredFn(void *param, SCacheNode *pNode) { SCacheObjTravSup *ps = (SCacheObjTravSup *)param; - SCacheObj *pCacheObj = ps->pCacheObj; + SCacheObj *pCacheObj = ps->pCacheObj; if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); @@ -320,7 +320,7 @@ static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { static bool doRemoveNodeFn(void *param, SCacheNode *pNode) { SCacheObjTravSup *ps = (SCacheObjTravSup *)param; - SCacheObj *pCacheObj = ps->pCacheObj; + SCacheObj *pCacheObj = ps->pCacheObj; if (T_REF_VAL_GET(pNode) == 0) { taosCacheReleaseNode(pCacheObj, pNode); @@ -347,14 +347,14 @@ static FORCE_INLINE int32_t getCacheCapacity(int32_t length) { len = (len << 1u); } - return len > CACHE_MAX_CAPACITY? CACHE_MAX_CAPACITY:len; + return len > CACHE_MAX_CAPACITY ? CACHE_MAX_CAPACITY : len; } -SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName) { const int32_t SLEEP_DURATION = 500; // 500 ms - if (refreshTimeInSeconds <= 0) { + if (refreshTimeInMs <= 0) { return NULL; } @@ -374,10 +374,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext } // set free cache node callback function - pCacheObj->hashFp = taosGetDefaultHashFunction(keyType); - pCacheObj->freeFp = fn; - pCacheObj->refreshTime = refreshTimeInSeconds * 1000; - pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; + pCacheObj->hashFp = taosGetDefaultHashFunction(keyType); + pCacheObj->freeFp = fn; + pCacheObj->refreshTime = refreshTimeInMs; + pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access if (__trashcan_lock_init(pCacheObj) != 0) { @@ -411,8 +411,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v taosWLockLatch(&pe->latch); - SCacheNode *prev = NULL; - SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); + SCacheNode *prev = NULL; + SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev); if (pNode == NULL) { pushfrontNodeInEntryList(pe, pNode1); @@ -460,12 +460,12 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen return NULL; } - SCacheNode *prev = NULL; + SCacheNode *prev = NULL; SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen); taosRLockLatch(&pe->latch); - SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); + SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev); if (pNode != NULL) { int32_t ref = T_REF_INC(pNode); ASSERT(ref > 0); @@ -589,7 +589,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } else { // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread // when reaches here. - SCacheNode * prev = NULL; + SCacheNode *prev = NULL; SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen); taosWLockLatch(&pe->latch); @@ -646,7 +646,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } } -void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) { +void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *pNode), SCacheObjTravSup *pSup) { int32_t numOfEntries = (int32_t)pCacheObj->capacity; for (int32_t i = 0; i < numOfEntries; ++i) { SCacheEntry *pEntry = &pCacheObj->pEntryList[i]; @@ -675,7 +675,7 @@ void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* p } } -void taosCacheEmpty(SCacheObj* pCacheObj) { +void taosCacheEmpty(SCacheObj *pCacheObj) { SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; doTraverseElems(pCacheObj, doRemoveNodeFn, &sup); taosTrashcanEmpty(pCacheObj, false); @@ -710,20 +710,20 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat return NULL; } - pNewNode->data = (char*)pNewNode + sizeof(SCacheNode); + pNewNode->data = (char *)pNewNode + sizeof(SCacheNode); pNewNode->dataLen = size; memcpy(pNewNode->data, pData, size); - pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size; + pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size; pNewNode->keyLen = (uint16_t)keyLen; memcpy(pNewNode->key, key, keyLen); - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->lifespan = duration; + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->lifespan = duration; pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)sizeInBytes; + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)sizeInBytes; return pNewNode; } @@ -914,21 +914,19 @@ void taosStopCacheRefreshWorker(void) { taosArrayDestroy(pCacheArrayList); } -size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj) { - return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; -} +size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; } -SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj) { +SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) { ASSERT(pCacheObj != NULL); - SCacheIter* pIter = taosMemoryCalloc(1, sizeof(SCacheIter)); - pIter->pCacheObj = (SCacheObj*) pCacheObj; + SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter)); + pIter->pCacheObj = (SCacheObj *)pCacheObj; pIter->entryIndex = -1; - pIter->index = -1; + pIter->index = -1; return pIter; } -bool taosCacheIterNext(SCacheIter* pIter) { - SCacheObj* pCacheObj = pIter->pCacheObj; +bool taosCacheIterNext(SCacheIter *pIter) { + SCacheObj *pCacheObj = pIter->pCacheObj; if (pIter->index + 1 >= pIter->numOfObj) { if (pIter->entryIndex + 1 >= pCacheObj->capacity) { @@ -936,9 +934,9 @@ bool taosCacheIterNext(SCacheIter* pIter) { } // release the reference for all objects in the snapshot - for(int32_t i = 0; i < pIter->numOfObj; ++i) { - char* p= pIter->pCurrent[i]->data; - taosCacheRelease(pCacheObj, (void**) &p, false); + for (int32_t i = 0; i < pIter->numOfObj; ++i) { + char *p = pIter->pCurrent[i]->data; + taosCacheRelease(pCacheObj, (void **)&p, false); pIter->pCurrent[i] = NULL; } @@ -967,7 +965,7 @@ bool taosCacheIterNext(SCacheIter* pIter) { pIter->pCurrent = (SCacheNode **)tmp; } - SCacheNode* pNode = pEntry->next; + SCacheNode *pNode = pEntry->next; for (int32_t i = 0; i < pEntry->num; ++i) { ASSERT(pNode != NULL); @@ -981,7 +979,7 @@ bool taosCacheIterNext(SCacheIter* pIter) { pIter->numOfObj = pEntry->num; taosRUnLockLatch(&pEntry->latch); - pIter->index = -1; + pIter->index = -1; break; } } @@ -990,19 +988,19 @@ bool taosCacheIterNext(SCacheIter* pIter) { return true; } -void* taosCacheIterGetData(const SCacheIter* pIter, size_t* len) { - SCacheNode* pNode = pIter->pCurrent[pIter->index]; +void *taosCacheIterGetData(const SCacheIter *pIter, size_t *len) { + SCacheNode *pNode = pIter->pCurrent[pIter->index]; *len = pNode->dataLen; return pNode->data; } -void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* len) { - SCacheNode* pNode = pIter->pCurrent[pIter->index]; +void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) { + SCacheNode *pNode = pIter->pCurrent[pIter->index]; *len = pNode->keyLen; return pNode->key; } -void taosCacheDestroyIter(SCacheIter* pIter) { +void taosCacheDestroyIter(SCacheIter *pIter) { taosMemoryFreeClear(pIter->pCurrent); taosMemoryFreeClear(pIter); -} \ No newline at end of file +} diff --git a/source/util/test/cacheTest.cpp b/source/util/test/cacheTest.cpp index 748cf31b67..45c887288c 100644 --- a/source/util/test/cacheTest.cpp +++ b/source/util/test/cacheTest.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include "os.h" #include "taos.h" @@ -8,132 +8,134 @@ // test cache TEST(cacheTest, client_cache_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test"); + SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC * 1000, 0, NULL, "test"); const char* key1 = "test1"; - char data1[] = "test11"; + char data1[] = "test11"; - char* cachedObj = (char*) taosCachePut(tscMetaCache, key1, strlen(key1), data1, strlen(data1)+1, 1); - taosSsleep(REFRESH_TIME_IN_SEC+1); + char* cachedObj = (char*)taosCachePut(tscMetaCache, key1, strlen(key1), data1, strlen(data1) + 1, 1); + taosSsleep(REFRESH_TIME_IN_SEC + 1); printf("obj is still valid: %s\n", cachedObj); char data2[] = "test22"; - taosCacheRelease(tscMetaCache, (void**) &cachedObj, false); + taosCacheRelease(tscMetaCache, (void**)&cachedObj, false); /* the object is cleared by cache clean operation */ - cachedObj = (char*) taosCachePut(tscMetaCache, key1, strlen(key1), data2, strlen(data2)+1, 20); + cachedObj = (char*)taosCachePut(tscMetaCache, key1, strlen(key1), data2, strlen(data2) + 1, 20); printf("after updated: %s\n", cachedObj); printf("start to remove data from cache\n"); - taosCacheRelease(tscMetaCache, (void**) &cachedObj, false); + taosCacheRelease(tscMetaCache, (void**)&cachedObj, false); printf("end of removing data from cache\n"); const char* key3 = "test2"; const char* data3 = "kkkkkkk"; - char* cachedObj2 = (char*) taosCachePut(tscMetaCache, key3, strlen(key3), data3, strlen(data3) + 1, 1); + char* cachedObj2 = (char*)taosCachePut(tscMetaCache, key3, strlen(key3), data3, strlen(data3) + 1, 1); printf("%s\n", cachedObj2); - taosCacheRelease(tscMetaCache, (void**) &cachedObj2, false); + taosCacheRelease(tscMetaCache, (void**)&cachedObj2, false); taosSsleep(3); - char* d = (char*) taosCacheAcquireByKey(tscMetaCache, key3, strlen(key3)); + char* d = (char*)taosCacheAcquireByKey(tscMetaCache, key3, strlen(key3)); assert(d == NULL); char key5[] = "test5"; char data5[] = "data5kkkkk"; - cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data5, strlen(data5) + 1, 20); + cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data5, strlen(data5) + 1, 20); - const char* data6= "new Data after updated"; - taosCacheRelease(tscMetaCache, (void**) &cachedObj2, false); + const char* data6 = "new Data after updated"; + taosCacheRelease(tscMetaCache, (void**)&cachedObj2, false); - cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data6, strlen(data6) + 1, 20); + cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data6, strlen(data6) + 1, 20); printf("%s\n", cachedObj2); - taosCacheRelease(tscMetaCache, (void**) &cachedObj2, true); + taosCacheRelease(tscMetaCache, (void**)&cachedObj2, true); const char* data7 = "add call update procedure"; - cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data7, strlen(data7) + 1, 20); + cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data7, strlen(data7) + 1, 20); printf("%s\n=======================================\n\n", cachedObj2); - char* cc = (char*) taosCacheAcquireByKey(tscMetaCache, key5, strlen(key5)); + char* cc = (char*)taosCacheAcquireByKey(tscMetaCache, key5, strlen(key5)); - taosCacheRelease(tscMetaCache, (void**) &cachedObj2, true); - taosCacheRelease(tscMetaCache, (void**) &cc, false); + taosCacheRelease(tscMetaCache, (void**)&cachedObj2, true); + taosCacheRelease(tscMetaCache, (void**)&cc, false); const char* data8 = "ttft"; const char* key6 = "key6"; - char* ft = (char*) taosCachePut(tscMetaCache, key6, strlen(key6), data8, strlen(data8), 20); - taosCacheRelease(tscMetaCache, (void**) &ft, false); + char* ft = (char*)taosCachePut(tscMetaCache, key6, strlen(key6), data8, strlen(data8), 20); + taosCacheRelease(tscMetaCache, (void**)&ft, false); /** * 140ns */ uint64_t startTime = taosGetTimestampUs(); printf("Cache Performance Test\nstart time:%" PRIu64 "\n", startTime); - for(int32_t i=0; i<1000; ++i) { - char* dd = (char*) taosCacheAcquireByKey(tscMetaCache, key6, strlen(key6)); + for (int32_t i = 0; i < 1000; ++i) { + char* dd = (char*)taosCacheAcquireByKey(tscMetaCache, key6, strlen(key6)); if (dd != NULL) { -// printf("get the data\n"); + // printf("get the data\n"); } else { printf("data has been released\n"); } - taosCacheRelease(tscMetaCache, (void**) &dd, false); + taosCacheRelease(tscMetaCache, (void**)&dd, false); } uint64_t endTime = taosGetTimestampUs(); - int64_t el = endTime - startTime; + int64_t el = endTime - startTime; - printf("End of Test, %" PRIu64 "\nTotal Elapsed Time:%" PRIu64 " us.avg:%f us\n", endTime, el, el/1000.0); + printf("End of Test, %" PRIu64 "\nTotal Elapsed Time:%" PRIu64 " us.avg:%f us\n", endTime, el, el / 1000.0); taosCacheCleanup(tscMetaCache); } TEST(cacheTest, cache_iter_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test"); + auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC * 1000, false, NULL, "test"); char key[256] = {0}; char data[1024] = "abcdefghijk"; -// int32_t len = strlen(data); + // int32_t len = strlen(data); uint64_t startTime = taosGetTimestampUs(); - int32_t num = 10000; + int32_t num = 10000; - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { int32_t len = sprintf(key, "abc_%7d", i); taosCachePut(pCache, key, strlen(key), data, len, 3600); } uint64_t endTime = taosGetTimestampUs(); - printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num); + printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, + (endTime - startTime) / (double)num); startTime = taosGetTimestampUs(); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { int32_t len = sprintf(key, "abc_%7d", i); - void* k = taosCacheAcquireByKey(pCache, key, len); + void* k = taosCacheAcquireByKey(pCache, key, len); assert(k != 0); } endTime = taosGetTimestampUs(); - printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num); + printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, + (endTime - startTime) / (double)num); - int32_t count = 0; + int32_t count = 0; SCacheIter* pIter = taosCacheCreateIter(pCache); - while(taosCacheIterNext(pIter)) { + while (taosCacheIterNext(pIter)) { size_t keyLen = 0; size_t dataLen = 0; char* key1 = static_cast(taosCacheIterGetKey(pIter, &keyLen)); char* data1 = static_cast(taosCacheIterGetData(pIter, &dataLen)); -// char d[256] = {0}; -// memcpy(d, data1, dataLen); -// char k[256] = {0}; -// memcpy(k, key1, keyLen); + // char d[256] = {0}; + // memcpy(d, data1, dataLen); + // char k[256] = {0}; + // memcpy(k, key1, keyLen); } ASSERT_EQ(count, num); -- GitLab