/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "rpcCache.h" #include "os.h" #include "rpcLog.h" #include "taosdef.h" #include "tglobal.h" #include "tmempool.h" #include "ttimer.h" #include "tutil.h" typedef struct SConnHash { char fqdn[TSDB_FQDN_LEN]; uint16_t port; char connType; struct SConnHash *prev; struct SConnHash *next; void * data; uint64_t time; } SConnHash; typedef struct { SConnHash ** connHashList; mpool_h connHashMemPool; int maxSessions; int total; int * count; int64_t keepTimer; TdThreadMutex mutex; void (*cleanFp)(void *); void * tmrCtrl; void * pTimer; int64_t *lockedBy; } SConnCache; static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType); static void rpcLockCache(int64_t *lockedBy); static void rpcUnlockCache(int64_t *lockedBy); static void rpcCleanConnCache(void *handle, void *tmrId); static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time); void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { SConnHash **connHashList; mpool_h connHashMemPool; SConnCache *pCache; connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); if (connHashMemPool == 0) return NULL; connHashList = taosMemoryCalloc(sizeof(SConnHash *), maxSessions); if (connHashList == 0) { taosMemPoolCleanUp(connHashMemPool); return NULL; } pCache = taosMemoryMalloc(sizeof(SConnCache)); if (pCache == NULL) { taosMemPoolCleanUp(connHashMemPool); taosMemoryFree(connHashList); return NULL; } memset(pCache, 0, sizeof(SConnCache)); pCache->count = taosMemoryCalloc(sizeof(int), maxSessions); pCache->total = 0; pCache->keepTimer = keepTimer; pCache->maxSessions = maxSessions; pCache->connHashMemPool = connHashMemPool; pCache->connHashList = connHashList; pCache->cleanFp = cleanFp; pCache->tmrCtrl = tmrCtrl; pCache->lockedBy = taosMemoryCalloc(sizeof(int64_t), maxSessions); taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); taosThreadMutexInit(&pCache->mutex, NULL); return pCache; } void rpcCloseConnCache(void *handle) { SConnCache *pCache; pCache = (SConnCache *)handle; if (pCache == NULL || pCache->maxSessions == 0) return; taosThreadMutexLock(&pCache->mutex); taosTmrStopA(&(pCache->pTimer)); if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); taosMemoryFreeClear(pCache->connHashList); taosMemoryFreeClear(pCache->count); taosMemoryFreeClear(pCache->lockedBy); taosThreadMutexUnlock(&pCache->mutex); taosThreadMutexDestroy(&pCache->mutex); memset(pCache, 0, sizeof(SConnCache)); taosMemoryFree(pCache); } void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; uint64_t time = taosGetTimestampMs(); pCache = (SConnCache *)handle; assert(pCache); assert(data); hash = rpcHashConn(pCache, fqdn, port, connType); pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn)); pNode->port = port; pNode->connType = connType; pNode->data = data; pNode->prev = NULL; pNode->time = time; rpcLockCache(pCache->lockedBy + hash); pNode->next = pCache->connHashList[hash]; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; pCache->connHashList[hash] = pNode; pCache->count[hash]++; rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); rpcUnlockCache(pCache->lockedBy + hash); pCache->total++; // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, // pCache->count[hash]); return; } void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; void * pData = NULL; pCache = (SConnCache *)handle; assert(pCache); uint64_t time = taosGetTimestampMs(); hash = rpcHashConn(pCache, fqdn, port, connType); rpcLockCache(pCache->lockedBy + hash); pNode = pCache->connHashList[hash]; while (pNode) { if (time >= pCache->keepTimer + pNode->time) { rpcRemoveExpiredNodes(pCache, pNode, hash, time); pNode = NULL; break; } if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break; pNode = pNode->next; } if (pNode) { rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); if (pNode->prev) { pNode->prev->next = pNode->next; } else { pCache->connHashList[hash] = pNode->next; } if (pNode->next) { pNode->next->prev = pNode->prev; } pData = pNode->data; taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pCache->total--; pCache->count[hash]--; } rpcUnlockCache(pCache->lockedBy + hash); if (pData) { // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, // pCache->count[hash]); } else { // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, // pCache->count[hash]); } return pData; } static void rpcCleanConnCache(void *handle, void *tmrId) { int hash; SConnHash * pNode; SConnCache *pCache; pCache = (SConnCache *)handle; if (pCache == NULL || pCache->maxSessions == 0) return; if (pCache->pTimer != tmrId) return; taosThreadMutexLock(&pCache->mutex); uint64_t time = taosGetTimestampMs(); for (hash = 0; hash < pCache->maxSessions; ++hash) { rpcLockCache(pCache->lockedBy + hash); pNode = pCache->connHashList[hash]; rpcRemoveExpiredNodes(pCache, pNode, hash, time); rpcUnlockCache(pCache->lockedBy + hash); } // tTrace("timer, total connections in cache:%d", pCache->total); taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer); taosThreadMutexUnlock(&pCache->mutex); } static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return; SConnHash *pPrev = pNode->prev, *pNext; while (pNode) { (*pCache->cleanFp)(pNode->data); pNext = pNode->next; pCache->total--; pCache->count[hash]--; // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, // pNode->connType, hash, pNode, // pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; } if (pPrev) pPrev->next = NULL; else pCache->connHashList[hash] = NULL; } static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { SConnCache *pCache = (SConnCache *)handle; int hash = 0; char * temp = fqdn; while (*temp) { hash += *temp; ++temp; } hash += port; hash += connType; hash = hash % pCache->maxSessions; return hash; } static void rpcLockCache(int64_t *lockedBy) { int64_t tid = taosGetSelfPthreadId(); int i = 0; while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { if (++i % 100 == 0) { sched_yield(); } } } static void rpcUnlockCache(int64_t *lockedBy) { int64_t tid = taosGetSelfPthreadId(); if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { assert(false); } }