提交 32e98876 编写于 作者: L Liu Jicong

feat(tmq): add push mode

上级 3035ecd5
...@@ -101,8 +101,6 @@ int32_t create_topic() { ...@@ -101,8 +101,6 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
......
...@@ -245,6 +245,8 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); ...@@ -245,6 +245,8 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
#if 0 #if 0
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
......
...@@ -22,21 +22,28 @@ extern "C" { ...@@ -22,21 +22,28 @@ extern "C" {
#include <semaphore.h> #include <semaphore.h>
#if defined (_TD_DARWIN_64) #if defined(_TD_DARWIN_64)
typedef struct tsem_s *tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value); typedef struct tsem_s *tsem_t;
int tsem_wait(tsem_t *sem);
int tsem_post(tsem_t *sem); int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_destroy(tsem_t *sem); int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t nanosecs);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#else #else
#define tsem_t sem_t
#define tsem_init sem_init #define tsem_t sem_t
int tsem_wait(tsem_t* sem); #define tsem_init sem_init
#define tsem_post sem_post int tsem_wait(tsem_t *sem);
#define tsem_destroy sem_destroy int tsem_timewait(tsem_t *sim, int64_t nanosecs);
#define tsem_post sem_post
#define tsem_destroy sem_destroy
#endif #endif
#if defined (_TD_DARWIN_64) #if defined(_TD_DARWIN_64)
// #define TdThreadRwlock TdThreadMutex // #define TdThreadRwlock TdThreadMutex
// #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL) // #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL)
// #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock) // #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock)
...@@ -44,20 +51,20 @@ extern "C" { ...@@ -44,20 +51,20 @@ extern "C" {
// #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock) // #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock) // #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
#define TdThreadSpinlock TdThreadMutex #define TdThreadSpinlock TdThreadMutex
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL) #define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
#define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock) #define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
#define taosThreadSpinLock(lock) taosThreadMutexLock(lock) #define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
#define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock) #define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
#endif #endif
bool taosCheckPthreadValid(TdThread thread); bool taosCheckPthreadValid(TdThread thread);
int64_t taosGetSelfPthreadId(); int64_t taosGetSelfPthreadId();
int64_t taosGetPthreadId(TdThread thread); int64_t taosGetPthreadId(TdThread thread);
void taosResetPthread(TdThread* thread); void taosResetPthread(TdThread *thread);
bool taosComparePthread(TdThread first, TdThread second); bool taosComparePthread(TdThread first, TdThread second);
int32_t taosGetPId(); int32_t taosGetPId();
int32_t taosGetAppName(char* name, int32_t* len); int32_t taosGetAppName(char *name, int32_t *len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -47,13 +47,13 @@ typedef struct STrashElem STrashElem; ...@@ -47,13 +47,13 @@ typedef struct STrashElem STrashElem;
/** /**
* initialize the cache object * initialize the cache object
* @param keyType key type * @param keyType key type
* @param refreshTimeInSeconds refresh operation interval time, the maximum survival time when one element is expired * @param refreshTimeInMs refresh operation interval time, the maximum survival time when one element is expired
* and not referenced by other objects * and not referenced by other objects
* @param extendLifespan auto extend lifespan, if accessed * @param extendLifespan auto extend lifespan, if accessed
* @param fn free resource callback function * @param fn free resource callback function
* @return * @return
*/ */
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn,
const char *cacheName); const char *cacheName);
/** /**
...@@ -111,7 +111,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); ...@@ -111,7 +111,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
* @param pCacheObj * @param pCacheObj
* @return * @return
*/ */
size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj); size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj);
/** /**
* move all data node into trash, clear node in trash can if it is not referenced by any clients * move all data node into trash, clear node in trash can if it is not referenced by any clients
...@@ -145,11 +145,11 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1); ...@@ -145,11 +145,11 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1);
*/ */
void taosStopCacheRefreshWorker(); void taosStopCacheRefreshWorker();
SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj); SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj);
bool taosCacheIterNext(SCacheIter* pIter); bool taosCacheIterNext(SCacheIter *pIter);
void* taosCacheIterGetData(const SCacheIter* pIter, size_t* dataLen); void *taosCacheIterGetData(const SCacheIter *pIter, size_t *dataLen);
void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* keyLen); void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *keyLen);
void taosCacheDestroyIter(SCacheIter* pIter); void taosCacheDestroyIter(SCacheIter *pIter);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -260,6 +260,16 @@ void tmq_list_destroy(tmq_list_t* list) { ...@@ -260,6 +260,16 @@ void tmq_list_destroy(tmq_list_t* list) {
taosArrayDestroy(container); taosArrayDestroy(container);
} }
int32_t tmq_list_get_size(const tmq_list_t* list) {
const SArray* container = &list->container;
return taosArrayGetSize(container);
}
char** tmq_list_to_c_array(const tmq_list_t* list) {
const SArray* container = &list->container;
return container->pData;
}
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg); return sprintf(dst, "%s:%d", topicName, vg);
} }
...@@ -387,7 +397,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -387,7 +397,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->consumerId = tGenIdPI64();
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
if (pTmq->clientTopics == NULL) { if (pTmq->clientTopics == NULL) {
taosMemoryFree(pTmq); taosMemoryFree(pTmq);
......
...@@ -49,6 +49,7 @@ static const SPerfsTableSchema topicSchema[] = { ...@@ -49,6 +49,7 @@ static const SPerfsTableSchema topicSchema[] = {
static const SPerfsTableSchema consumerSchema[] = { static const SPerfsTableSchema consumerSchema[] = {
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
...@@ -61,6 +62,7 @@ static const SPerfsTableSchema subscribeSchema[] = { ...@@ -61,6 +62,7 @@ static const SPerfsTableSchema subscribeSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
}; };
......
...@@ -24,20 +24,20 @@ ...@@ -24,20 +24,20 @@
#include "version.h" #include "version.h"
typedef struct { typedef struct {
uint32_t id; uint32_t id;
int8_t connType; int8_t connType;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc int32_t pid; // pid of app that invokes taosc
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t killed; int8_t killed;
int64_t loginTimeMs; int64_t loginTimeMs;
int64_t lastAccessTimeMs; int64_t lastAccessTimeMs;
uint64_t killId; uint64_t killId;
int32_t numOfQueries; int32_t numOfQueries;
SArray *pQueries; //SArray<SQueryDesc> SArray *pQueries; // SArray<SQueryDesc>
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
...@@ -58,7 +58,8 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); ...@@ -58,7 +58,8 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) { int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connCheckTime = tsShellActivityTimer * 2; // in ms
int32_t connCheckTime = tsShellActivityTimer * 2 * 1000;
pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn"); pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
if (pMgmt->cache == NULL) { if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -71,9 +72,9 @@ int32_t mndInitProfile(SMnode *pMnode) { ...@@ -71,9 +72,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
return 0; return 0;
...@@ -91,7 +92,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType ...@@ -91,7 +92,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
int32_t pid, const char *app, int64_t startTime) { int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
char connStr[255] = {0}; char connStr[255] = {0};
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
int32_t connId = mndGenerateUid(connStr, len); int32_t connId = mndGenerateUid(connStr, len);
if (startTime == 0) startTime = taosGetTimestampMs(); if (startTime == 0) startTime = taosGetTimestampMs();
...@@ -253,8 +254,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -253,8 +254,8 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->pQueries = pBasic->queryDesc; pConn->pQueries = pBasic->queryDesc;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -324,9 +325,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { ...@@ -324,9 +325,10 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return NULL; return NULL;
} }
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
if (pHbReq->query) { if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query; SQueryHbReqBasic *pBasic = pHbReq->query;
...@@ -335,8 +337,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -335,8 +337,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rpcGetConnInfo(pMsg->handle, &connInfo); rpcGetConnInfo(pMsg->handle, &connInfo);
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
pBasic->pid, pBasic->app, 0);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1; return -1;
...@@ -345,7 +348,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -345,7 +348,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
} else if (pConn->killed) { } else if (pConn->killed) {
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_MND_INVALID_CONNECTION; terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1; return -1;
} }
...@@ -369,8 +372,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -369,8 +372,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
rspBasic->connId = pConn->id; rspBasic->connId = pConn->id;
rspBasic->totalDnodes = 1; //TODO rspBasic->totalDnodes = 1; // TODO
rspBasic->onlineDnodes = 1; //TODO rspBasic->onlineDnodes = 1; // TODO
mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
...@@ -379,7 +382,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb ...@@ -379,7 +382,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
int32_t kvNum = taosHashGetSize(pHbReq->info); int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) { if (NULL == pHbReq->info || kvNum <= 0) {
taosArrayPush(pBatchRsp->rsps, &hbRsp); taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -604,8 +607,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int ...@@ -604,8 +607,8 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int
} }
static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
#if 0 #if 0
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;
...@@ -703,7 +706,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i ...@@ -703,7 +706,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
#endif #endif
return numOfRows; return numOfRows;
} }
......
...@@ -28,7 +28,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); ...@@ -28,7 +28,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
int32_t mndInitShow(SMnode *pMnode) { int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show"); pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5000, true, (__cache_free_fn_t)mndFreeShowObj, "show");
if (pMgmt->cache == NULL) { if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc show cache since %s", terrstr()); mError("failed to alloc show cache since %s", terrstr());
......
...@@ -534,8 +534,8 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -534,8 +534,8 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE); char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql); tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
colDataAppend(pColInfo, numOfRows, (const char *)sql, false); colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
......
...@@ -18,8 +18,10 @@ ...@@ -18,8 +18,10 @@
#include "executor.h" #include "executor.h"
#include "os.h" #include "os.h"
#include "tcache.h"
#include "thash.h" #include "thash.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
...@@ -142,26 +144,37 @@ typedef struct { ...@@ -142,26 +144,37 @@ typedef struct {
} STqMetaStore; } STqMetaStore;
typedef struct { typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int64_t consumerId;
int64_t consumerId; int32_t epoch;
int32_t epoch; int32_t skipLogNum;
int8_t subType; int64_t reqOffset;
int8_t withTbName; SRWLatch lock;
int8_t withSchema; SRpcMsg* handle;
int8_t withTag; } STqPushHandle;
int8_t withTagSchema;
char* qmsg; typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t subType;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg;
STqPushHandle pushHandle;
// SRWLatch lock; // SRWLatch lock;
SWalReadHandle* pWalReader; SWalReadHandle* pWalReader;
// number should be identical to fetch thread num // task number should be the same with fetch thread
STqReadHandle* pStreamReader[4]; STqReadHandle* pExecReader[5];
qTaskInfo_t task[4]; qTaskInfo_t task[5];
} STqExec; } STqExec;
struct STQ { struct STQ {
char* path; char* path;
// STqMetaStore* tqMeta; // STqMetaStore* tqMeta;
SHashObj* execs; // subKey -> tqExec SHashObj* pushMgr; // consumerId -> STqExec*
SHashObj* execs; // subKey -> STqExec
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode; SVnode* pVnode;
SWal* pWal; SWal* pWal;
......
...@@ -41,6 +41,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ...@@ -41,6 +41,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
return pTq; return pTq;
} }
...@@ -52,8 +54,139 @@ void tqClose(STQ* pTq) { ...@@ -52,8 +54,139 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
STqExec* pExec = NULL;
SSubmitReq* pReq = (SSubmitReq*)msg;
int32_t workerId = 4;
int64_t fetchOffset = ver;
while (1) {
pIter = taosHashIterate(pTq->pushMgr, pIter);
if (pIter == NULL) break;
pExec = (STqExec**)pIter;
taosWLockLatch(&pExec->pushHandle.lock);
SRpcMsg* pMsg = atomic_load_ptr(&pExec->pushHandle.handle);
ASSERT(pMsg);
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pExec->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(int32_t));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId];
ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
&block.info.numOfCols) < 0) {
ASSERT(0);
}
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
/*pRetrieve->useconds = 0;*/
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(block.info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else {
ASSERT(0);
}
if (rsp.blockNum == 0) {
taosWUnLockLatch(&pExec->pushHandle.lock);
continue;
}
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pExec->pushHandle.epoch;
((SMqRspHead*)buf)->consumerId = pExec->pushHandle.consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
taosWUnLockLatch(&pExec->pushHandle.lock);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset);
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
}
return 0;
}
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
if (data == NULL) { if (data == NULL) {
return -1; return -1;
...@@ -71,6 +204,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi ...@@ -71,6 +204,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
.pCont = data, .pCont = data,
.contLen = msgLen, .contLen = msgLen,
}; };
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#if 0 #if 0
...@@ -240,6 +374,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -240,6 +374,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReqV2* pReq = pMsg->pCont; SMqPollReqV2* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t waitTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch; int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset; int64_t fetchOffset;
...@@ -265,8 +400,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -265,8 +400,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch); consumerEpoch = atomic_load_32(&pExec->epoch);
...@@ -283,6 +418,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -283,6 +418,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// response to user // response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset); TD_VID(pTq->pVnode), fetchOffset);
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
pExec->pushHandle.consumerId = consumerId;
pExec->pushHandle.epoch = reqEpoch;
pExec->pushHandle.reqOffset = rsp.reqOffset;
pExec->pushHandle.skipLogNum = rsp.skipLogNum;
pExec->pushHandle.handle = pMsg;
taosWUnLockLatch(&pExec->pushHandle.lock);
// TODO add timer
// TODO: the pointer will always be valid?
taosHashPut(pTq->pushMgr, &consumerId, sizeof(int64_t), &pExec, sizeof(void*));
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
return 0;
#endif
break; break;
} }
...@@ -325,7 +482,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -325,7 +482,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.blockNum++; rsp.blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pStreamReader[workerId]; STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0); tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -635,10 +792,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -635,10 +792,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL; req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 4; i++) { for (int32_t i = 0; i < 5; i++) {
pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pExec->pStreamReader[i], .reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
......
...@@ -85,11 +85,11 @@ int32_t tsem_wait(tsem_t* sem) { ...@@ -85,11 +85,11 @@ int32_t tsem_wait(tsem_t* sem) {
#include <mach/semaphore.h> #include <mach/semaphore.h>
#include <mach/task.h> #include <mach/task.h>
static TdThread sem_thread; static TdThread sem_thread;
static TdThreadOnce sem_once; static TdThreadOnce sem_once;
static task_t sem_port; static task_t sem_port;
static volatile int sem_inited = 0; static volatile int sem_inited = 0;
static semaphore_t sem_exit; static semaphore_t sem_exit;
static void *sem_thread_routine(void *arg) { static void *sem_thread_routine(void *arg) {
(void)arg; (void)arg;
...@@ -122,12 +122,12 @@ static void once_init(void) { ...@@ -122,12 +122,12 @@ static void once_init(void) {
struct tsem_s { struct tsem_s {
#ifdef SEM_USE_PTHREAD #ifdef SEM_USE_PTHREAD
TdThreadMutex lock; TdThreadMutex lock;
TdThreadCond cond; TdThreadCond cond;
volatile int64_t val; volatile int64_t val;
#elif defined(SEM_USE_POSIX) #elif defined(SEM_USE_POSIX)
size_t id; size_t id;
sem_t * sem; sem_t *sem;
#elif defined(SEM_USE_SEM) #elif defined(SEM_USE_SEM)
semaphore_t sem; semaphore_t sem;
#else // SEM_USE_PTHREAD #else // SEM_USE_PTHREAD
...@@ -140,7 +140,8 @@ struct tsem_s { ...@@ -140,7 +140,8 @@ struct tsem_s {
int tsem_init(tsem_t *sem, int pshared, unsigned int value) { int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if (*sem) { if (*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p)); struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
...@@ -180,20 +181,22 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { ...@@ -180,20 +181,22 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
int e = errno; int e = errno;
if (e == EEXIST) continue; if (e == EEXIST) continue;
if (e == EINTR) continue; if (e == EINTR) continue;
fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
strerror(e)); e, strerror(e));
abort(); abort();
} while (p->sem == SEM_FAILED); } while (p->sem == SEM_FAILED);
#elif defined(SEM_USE_SEM) #elif defined(SEM_USE_SEM)
taosThreadOnce(&sem_once, once_init); taosThreadOnce(&sem_once, once_init);
if (sem_inited != 1) { if (sem_inited != 1) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
__func__, sem);
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
if (ret != KERN_SUCCESS) { if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
// we fail-fast here, because we have less-doc about semaphore_create for the moment // we fail-fast here, because we have less-doc about semaphore_create for the moment
abort(); abort();
} }
...@@ -224,18 +227,21 @@ int tsem_wait(tsem_t *sem) { ...@@ -224,18 +227,21 @@ int tsem_wait(tsem_t *sem) {
} }
#ifdef SEM_USE_PTHREAD #ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) { if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
p->val -= 1; p->val -= 1;
if (p->val < 0) { if (p->val < 0) {
if (taosThreadCondWait(&p->cond, &p->lock)) { if (taosThreadCondWait(&p->cond, &p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
} }
if (taosThreadMutexUnlock(&p->lock)) { if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
return 0; return 0;
...@@ -260,18 +266,21 @@ int tsem_post(tsem_t *sem) { ...@@ -260,18 +266,21 @@ int tsem_post(tsem_t *sem) {
} }
#ifdef SEM_USE_PTHREAD #ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) { if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
p->val += 1; p->val += 1;
if (p->val <= 0) { if (p->val <= 0) {
if (taosThreadCondSignal(&p->cond)) { if (taosThreadCondSignal(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
} }
if (taosThreadMutexUnlock(&p->lock)) { if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
return 0; return 0;
...@@ -293,26 +302,30 @@ int tsem_destroy(tsem_t *sem) { ...@@ -293,26 +302,30 @@ int tsem_destroy(tsem_t *sem) {
} }
struct tsem_s *p = *sem; struct tsem_s *p = *sem;
if (!p->valid) { if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// abort(); // sem); abort();
return 0; return 0;
} }
#ifdef SEM_USE_PTHREAD #ifdef SEM_USE_PTHREAD
if (taosThreadMutexLock(&p->lock)) { if (taosThreadMutexLock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
p->valid = 0; p->valid = 0;
if (taosThreadCondDestroy(&p->cond)) { if (taosThreadCondDestroy(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
if (taosThreadMutexUnlock(&p->lock)) { if (taosThreadMutexUnlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
if (taosThreadMutexDestroy(&p->lock)) { if (taosThreadMutexDestroy(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
sem);
abort(); abort();
} }
#elif defined(SEM_USE_POSIX) #elif defined(SEM_USE_POSIX)
...@@ -321,8 +334,8 @@ int tsem_destroy(tsem_t *sem) { ...@@ -321,8 +334,8 @@ int tsem_destroy(tsem_t *sem) {
int r = sem_unlink(name); int r = sem_unlink(name);
if (r) { if (r) {
int e = errno; int e = errno;
fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
strerror(e)); e, strerror(e));
abort(); abort();
} }
#elif defined(SEM_USE_SEM) #elif defined(SEM_USE_SEM)
...@@ -424,4 +437,17 @@ int32_t tsem_wait(tsem_t* sem) { ...@@ -424,4 +437,17 @@ int32_t tsem_wait(tsem_t* sem) {
return ret; return ret;
} }
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
int ret = 0;
struct timespec tv = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue;
return ret;
}
#endif #endif
...@@ -19,16 +19,16 @@ ...@@ -19,16 +19,16 @@
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
#define CACHE_MAX_CAPACITY 1024*1024*16 #define CACHE_MAX_CAPACITY 1024 * 1024 * 16
#define CACHE_DEFAULT_CAPACITY 1024*4 #define CACHE_DEFAULT_CAPACITY 1024 * 4
static TdThread cacheRefreshWorker = {0}; static TdThread cacheRefreshWorker = {0};
static TdThreadOnce cacheThreadInit = PTHREAD_ONCE_INIT; static TdThreadOnce cacheThreadInit = PTHREAD_ONCE_INIT;
static TdThreadMutex guard = PTHREAD_MUTEX_INITIALIZER; static TdThreadMutex guard = PTHREAD_MUTEX_INITIALIZER;
static SArray *pCacheArrayList = NULL; static SArray *pCacheArrayList = NULL;
static bool stopRefreshWorker = false; static bool stopRefreshWorker = false;
static bool refreshWorkerNormalStopped = false; static bool refreshWorkerNormalStopped = false;
static bool refreshWorkerUnexpectedStopped = false; static bool refreshWorkerUnexpectedStopped = false;
typedef struct SCacheNode { typedef struct SCacheNode {
uint64_t addedTime; // the added time when this element is added or updated into cache uint64_t addedTime; // the added time when this element is added or updated into cache
...@@ -36,7 +36,7 @@ typedef struct SCacheNode { ...@@ -36,7 +36,7 @@ typedef struct SCacheNode {
int64_t expireTime; // expire time int64_t expireTime; // expire time
uint64_t signature; uint64_t signature;
struct STrashElem *pTNodeHeader; // point to trash node head struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keyLen: 15; // max key size: 32kb uint16_t keyLen : 15; // max key size: 32kb
bool inTrashcan : 1; // denote if it is in trash or not bool inTrashcan : 1; // denote if it is in trash or not
uint32_t size; // allocated size for current SCacheNode uint32_t size; // allocated size for current SCacheNode
uint32_t dataLen; uint32_t dataLen;
...@@ -47,8 +47,8 @@ typedef struct SCacheNode { ...@@ -47,8 +47,8 @@ typedef struct SCacheNode {
} SCacheNode; } SCacheNode;
typedef struct SCacheEntry { typedef struct SCacheEntry {
int32_t num; // number of elements in current entry int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch SRWLatch latch; // entry latch
SCacheNode *next; SCacheNode *next;
} SCacheEntry; } SCacheEntry;
...@@ -75,24 +75,24 @@ typedef struct SCacheIter { ...@@ -75,24 +75,24 @@ typedef struct SCacheIter {
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/ */
struct SCacheObj { struct SCacheObj {
int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included. int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime; int64_t refreshTime;
char *name; char *name;
SCacheStatis statistics; SCacheStatis statistics;
SCacheEntry *pEntryList; SCacheEntry *pEntryList;
size_t capacity; // number of slots size_t capacity; // number of slots
size_t numOfElems; // number of elements in cache size_t numOfElems; // number of elements in cache
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
__cache_free_fn_t freeFp; __cache_free_fn_t freeFp;
uint32_t numOfElemsInTrash; // number of element in trash uint32_t numOfElemsInTrash; // number of element in trash
STrashElem *pTrash; STrashElem *pTrash;
uint8_t deleting; // set the deleting flag to stop refreshing ASAP. uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
TdThread refreshWorker; TdThread refreshWorker;
bool extendLifespan; // auto extend life span when one item is accessed. bool extendLifespan; // auto extend life span when one item is accessed.
int64_t checkTick; // tick used to record the check times of the refresh threads int64_t checkTick; // tick used to record the check times of the refresh threads
#if defined(LINUX) #if defined(LINUX)
TdThreadRwlock lock; TdThreadRwlock lock;
#else #else
...@@ -182,7 +182,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) { ...@@ -182,7 +182,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
* @return SCacheNode * @return SCacheNode
*/ */
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
uint64_t duration); uint64_t duration);
/** /**
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
...@@ -267,7 +267,7 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) { ...@@ -267,7 +267,7 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
pEntry->num += 1; pEntry->num += 1;
} }
static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) { static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) {
if (prev == NULL) { if (prev == NULL) {
ASSERT(pe->next == pNode); ASSERT(pe->next == pNode);
pe->next = pNode->pNext; pe->next = pNode->pNext;
...@@ -279,14 +279,14 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* ...@@ -279,14 +279,14 @@ static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode*
pe->num -= 1; pe->num -= 1;
} }
static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) { static FORCE_INLINE SCacheEntry *doFindEntry(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen); uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
int32_t slot = hashVal % pCacheObj->capacity; int32_t slot = hashVal % pCacheObj->capacity;
return &pCacheObj->pEntryList[slot]; return &pCacheObj->pEntryList[slot];
} }
static FORCE_INLINE SCacheNode * static FORCE_INLINE SCacheNode *doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen,
doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) { SCacheNode **prev) {
SCacheNode *pNode = pe->next; SCacheNode *pNode = pe->next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) { if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
...@@ -299,9 +299,9 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode* ...@@ -299,9 +299,9 @@ doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode*
return pNode; return pNode;
} }
static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { static bool doRemoveExpiredFn(void *param, SCacheNode *pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param; SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj; SCacheObj *pCacheObj = ps->pCacheObj;
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
...@@ -320,7 +320,7 @@ static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { ...@@ -320,7 +320,7 @@ static bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
static bool doRemoveNodeFn(void *param, SCacheNode *pNode) { static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
SCacheObjTravSup *ps = (SCacheObjTravSup *)param; SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
SCacheObj *pCacheObj = ps->pCacheObj; SCacheObj *pCacheObj = ps->pCacheObj;
if (T_REF_VAL_GET(pNode) == 0) { if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
...@@ -347,14 +347,14 @@ static FORCE_INLINE int32_t getCacheCapacity(int32_t length) { ...@@ -347,14 +347,14 @@ static FORCE_INLINE int32_t getCacheCapacity(int32_t length) {
len = (len << 1u); len = (len << 1u);
} }
return len > CACHE_MAX_CAPACITY? CACHE_MAX_CAPACITY:len; return len > CACHE_MAX_CAPACITY ? CACHE_MAX_CAPACITY : len;
} }
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInMs, bool extendLifespan, __cache_free_fn_t fn,
const char *cacheName) { const char *cacheName) {
const int32_t SLEEP_DURATION = 500; // 500 ms const int32_t SLEEP_DURATION = 500; // 500 ms
if (refreshTimeInSeconds <= 0) { if (refreshTimeInMs <= 0) {
return NULL; return NULL;
} }
...@@ -374,10 +374,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -374,10 +374,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
} }
// set free cache node callback function // set free cache node callback function
pCacheObj->hashFp = taosGetDefaultHashFunction(keyType); pCacheObj->hashFp = taosGetDefaultHashFunction(keyType);
pCacheObj->freeFp = fn; pCacheObj->freeFp = fn;
pCacheObj->refreshTime = refreshTimeInSeconds * 1000; pCacheObj->refreshTime = refreshTimeInMs;
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
if (__trashcan_lock_init(pCacheObj) != 0) { if (__trashcan_lock_init(pCacheObj) != 0) {
...@@ -411,8 +411,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -411,8 +411,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
SCacheNode *prev = NULL; SCacheNode *prev = NULL;
SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
if (pNode == NULL) { if (pNode == NULL) {
pushfrontNodeInEntryList(pe, pNode1); pushfrontNodeInEntryList(pe, pNode1);
...@@ -460,12 +460,12 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -460,12 +460,12 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return NULL; return NULL;
} }
SCacheNode *prev = NULL; SCacheNode *prev = NULL;
SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen); SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
taosRLockLatch(&pe->latch); taosRLockLatch(&pe->latch);
SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev);
if (pNode != NULL) { if (pNode != NULL) {
int32_t ref = T_REF_INC(pNode); int32_t ref = T_REF_INC(pNode);
ASSERT(ref > 0); ASSERT(ref > 0);
...@@ -589,7 +589,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -589,7 +589,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} else { } else {
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
// when reaches here. // when reaches here.
SCacheNode * prev = NULL; SCacheNode *prev = NULL;
SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen); SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
...@@ -646,7 +646,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -646,7 +646,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} }
} }
void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) { void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *pNode), SCacheObjTravSup *pSup) {
int32_t numOfEntries = (int32_t)pCacheObj->capacity; int32_t numOfEntries = (int32_t)pCacheObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) { for (int32_t i = 0; i < numOfEntries; ++i) {
SCacheEntry *pEntry = &pCacheObj->pEntryList[i]; SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
...@@ -675,7 +675,7 @@ void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* p ...@@ -675,7 +675,7 @@ void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* p
} }
} }
void taosCacheEmpty(SCacheObj* pCacheObj) { void taosCacheEmpty(SCacheObj *pCacheObj) {
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
doTraverseElems(pCacheObj, doRemoveNodeFn, &sup); doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
taosTrashcanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
...@@ -710,20 +710,20 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat ...@@ -710,20 +710,20 @@ SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pDat
return NULL; return NULL;
} }
pNewNode->data = (char*)pNewNode + sizeof(SCacheNode); pNewNode->data = (char *)pNewNode + sizeof(SCacheNode);
pNewNode->dataLen = size; pNewNode->dataLen = size;
memcpy(pNewNode->data, pData, size); memcpy(pNewNode->data, pData, size);
pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size; pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size;
pNewNode->keyLen = (uint16_t)keyLen; pNewNode->keyLen = (uint16_t)keyLen;
memcpy(pNewNode->key, key, keyLen); memcpy(pNewNode->key, key, keyLen);
pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->lifespan = duration; pNewNode->lifespan = duration;
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
pNewNode->signature = (uint64_t)pNewNode; pNewNode->signature = (uint64_t)pNewNode;
pNewNode->size = (uint32_t)sizeInBytes; pNewNode->size = (uint32_t)sizeInBytes;
return pNewNode; return pNewNode;
} }
...@@ -914,21 +914,19 @@ void taosStopCacheRefreshWorker(void) { ...@@ -914,21 +914,19 @@ void taosStopCacheRefreshWorker(void) {
taosArrayDestroy(pCacheArrayList); taosArrayDestroy(pCacheArrayList);
} }
size_t taosCacheGetNumOfObj(const SCacheObj* pCacheObj) { size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; }
return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash;
}
SCacheIter* taosCacheCreateIter(const SCacheObj* pCacheObj) { SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) {
ASSERT(pCacheObj != NULL); ASSERT(pCacheObj != NULL);
SCacheIter* pIter = taosMemoryCalloc(1, sizeof(SCacheIter)); SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter));
pIter->pCacheObj = (SCacheObj*) pCacheObj; pIter->pCacheObj = (SCacheObj *)pCacheObj;
pIter->entryIndex = -1; pIter->entryIndex = -1;
pIter->index = -1; pIter->index = -1;
return pIter; return pIter;
} }
bool taosCacheIterNext(SCacheIter* pIter) { bool taosCacheIterNext(SCacheIter *pIter) {
SCacheObj* pCacheObj = pIter->pCacheObj; SCacheObj *pCacheObj = pIter->pCacheObj;
if (pIter->index + 1 >= pIter->numOfObj) { if (pIter->index + 1 >= pIter->numOfObj) {
if (pIter->entryIndex + 1 >= pCacheObj->capacity) { if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
...@@ -936,9 +934,9 @@ bool taosCacheIterNext(SCacheIter* pIter) { ...@@ -936,9 +934,9 @@ bool taosCacheIterNext(SCacheIter* pIter) {
} }
// release the reference for all objects in the snapshot // release the reference for all objects in the snapshot
for(int32_t i = 0; i < pIter->numOfObj; ++i) { for (int32_t i = 0; i < pIter->numOfObj; ++i) {
char* p= pIter->pCurrent[i]->data; char *p = pIter->pCurrent[i]->data;
taosCacheRelease(pCacheObj, (void**) &p, false); taosCacheRelease(pCacheObj, (void **)&p, false);
pIter->pCurrent[i] = NULL; pIter->pCurrent[i] = NULL;
} }
...@@ -967,7 +965,7 @@ bool taosCacheIterNext(SCacheIter* pIter) { ...@@ -967,7 +965,7 @@ bool taosCacheIterNext(SCacheIter* pIter) {
pIter->pCurrent = (SCacheNode **)tmp; pIter->pCurrent = (SCacheNode **)tmp;
} }
SCacheNode* pNode = pEntry->next; SCacheNode *pNode = pEntry->next;
for (int32_t i = 0; i < pEntry->num; ++i) { for (int32_t i = 0; i < pEntry->num; ++i) {
ASSERT(pNode != NULL); ASSERT(pNode != NULL);
...@@ -981,7 +979,7 @@ bool taosCacheIterNext(SCacheIter* pIter) { ...@@ -981,7 +979,7 @@ bool taosCacheIterNext(SCacheIter* pIter) {
pIter->numOfObj = pEntry->num; pIter->numOfObj = pEntry->num;
taosRUnLockLatch(&pEntry->latch); taosRUnLockLatch(&pEntry->latch);
pIter->index = -1; pIter->index = -1;
break; break;
} }
} }
...@@ -990,19 +988,19 @@ bool taosCacheIterNext(SCacheIter* pIter) { ...@@ -990,19 +988,19 @@ bool taosCacheIterNext(SCacheIter* pIter) {
return true; return true;
} }
void* taosCacheIterGetData(const SCacheIter* pIter, size_t* len) { void *taosCacheIterGetData(const SCacheIter *pIter, size_t *len) {
SCacheNode* pNode = pIter->pCurrent[pIter->index]; SCacheNode *pNode = pIter->pCurrent[pIter->index];
*len = pNode->dataLen; *len = pNode->dataLen;
return pNode->data; return pNode->data;
} }
void* taosCacheIterGetKey(const SCacheIter* pIter, size_t* len) { void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) {
SCacheNode* pNode = pIter->pCurrent[pIter->index]; SCacheNode *pNode = pIter->pCurrent[pIter->index];
*len = pNode->keyLen; *len = pNode->keyLen;
return pNode->key; return pNode->key;
} }
void taosCacheDestroyIter(SCacheIter* pIter) { void taosCacheDestroyIter(SCacheIter *pIter) {
taosMemoryFreeClear(pIter->pCurrent); taosMemoryFreeClear(pIter->pCurrent);
taosMemoryFreeClear(pIter); taosMemoryFreeClear(pIter);
} }
\ No newline at end of file
#include <iostream>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <iostream>
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
...@@ -8,132 +8,134 @@ ...@@ -8,132 +8,134 @@
// test cache // test cache
TEST(cacheTest, client_cache_test) { TEST(cacheTest, client_cache_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, 0, NULL, "test"); SCacheObj* tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC * 1000, 0, NULL, "test");
const char* key1 = "test1"; const char* key1 = "test1";
char data1[] = "test11"; char data1[] = "test11";
char* cachedObj = (char*) taosCachePut(tscMetaCache, key1, strlen(key1), data1, strlen(data1)+1, 1); char* cachedObj = (char*)taosCachePut(tscMetaCache, key1, strlen(key1), data1, strlen(data1) + 1, 1);
taosSsleep(REFRESH_TIME_IN_SEC+1); taosSsleep(REFRESH_TIME_IN_SEC + 1);
printf("obj is still valid: %s\n", cachedObj); printf("obj is still valid: %s\n", cachedObj);
char data2[] = "test22"; char data2[] = "test22";
taosCacheRelease(tscMetaCache, (void**) &cachedObj, false); taosCacheRelease(tscMetaCache, (void**)&cachedObj, false);
/* the object is cleared by cache clean operation */ /* the object is cleared by cache clean operation */
cachedObj = (char*) taosCachePut(tscMetaCache, key1, strlen(key1), data2, strlen(data2)+1, 20); cachedObj = (char*)taosCachePut(tscMetaCache, key1, strlen(key1), data2, strlen(data2) + 1, 20);
printf("after updated: %s\n", cachedObj); printf("after updated: %s\n", cachedObj);
printf("start to remove data from cache\n"); printf("start to remove data from cache\n");
taosCacheRelease(tscMetaCache, (void**) &cachedObj, false); taosCacheRelease(tscMetaCache, (void**)&cachedObj, false);
printf("end of removing data from cache\n"); printf("end of removing data from cache\n");
const char* key3 = "test2"; const char* key3 = "test2";
const char* data3 = "kkkkkkk"; const char* data3 = "kkkkkkk";
char* cachedObj2 = (char*) taosCachePut(tscMetaCache, key3, strlen(key3), data3, strlen(data3) + 1, 1); char* cachedObj2 = (char*)taosCachePut(tscMetaCache, key3, strlen(key3), data3, strlen(data3) + 1, 1);
printf("%s\n", cachedObj2); printf("%s\n", cachedObj2);
taosCacheRelease(tscMetaCache, (void**) &cachedObj2, false); taosCacheRelease(tscMetaCache, (void**)&cachedObj2, false);
taosSsleep(3); taosSsleep(3);
char* d = (char*) taosCacheAcquireByKey(tscMetaCache, key3, strlen(key3)); char* d = (char*)taosCacheAcquireByKey(tscMetaCache, key3, strlen(key3));
assert(d == NULL); assert(d == NULL);
char key5[] = "test5"; char key5[] = "test5";
char data5[] = "data5kkkkk"; char data5[] = "data5kkkkk";
cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data5, strlen(data5) + 1, 20); cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data5, strlen(data5) + 1, 20);
const char* data6= "new Data after updated"; const char* data6 = "new Data after updated";
taosCacheRelease(tscMetaCache, (void**) &cachedObj2, false); taosCacheRelease(tscMetaCache, (void**)&cachedObj2, false);
cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data6, strlen(data6) + 1, 20); cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data6, strlen(data6) + 1, 20);
printf("%s\n", cachedObj2); printf("%s\n", cachedObj2);
taosCacheRelease(tscMetaCache, (void**) &cachedObj2, true); taosCacheRelease(tscMetaCache, (void**)&cachedObj2, true);
const char* data7 = "add call update procedure"; const char* data7 = "add call update procedure";
cachedObj2 = (char*) taosCachePut(tscMetaCache, key5, strlen(key5), data7, strlen(data7) + 1, 20); cachedObj2 = (char*)taosCachePut(tscMetaCache, key5, strlen(key5), data7, strlen(data7) + 1, 20);
printf("%s\n=======================================\n\n", cachedObj2); printf("%s\n=======================================\n\n", cachedObj2);
char* cc = (char*) taosCacheAcquireByKey(tscMetaCache, key5, strlen(key5)); char* cc = (char*)taosCacheAcquireByKey(tscMetaCache, key5, strlen(key5));
taosCacheRelease(tscMetaCache, (void**) &cachedObj2, true); taosCacheRelease(tscMetaCache, (void**)&cachedObj2, true);
taosCacheRelease(tscMetaCache, (void**) &cc, false); taosCacheRelease(tscMetaCache, (void**)&cc, false);
const char* data8 = "ttft"; const char* data8 = "ttft";
const char* key6 = "key6"; const char* key6 = "key6";
char* ft = (char*) taosCachePut(tscMetaCache, key6, strlen(key6), data8, strlen(data8), 20); char* ft = (char*)taosCachePut(tscMetaCache, key6, strlen(key6), data8, strlen(data8), 20);
taosCacheRelease(tscMetaCache, (void**) &ft, false); taosCacheRelease(tscMetaCache, (void**)&ft, false);
/** /**
* 140ns * 140ns
*/ */
uint64_t startTime = taosGetTimestampUs(); uint64_t startTime = taosGetTimestampUs();
printf("Cache Performance Test\nstart time:%" PRIu64 "\n", startTime); printf("Cache Performance Test\nstart time:%" PRIu64 "\n", startTime);
for(int32_t i=0; i<1000; ++i) { for (int32_t i = 0; i < 1000; ++i) {
char* dd = (char*) taosCacheAcquireByKey(tscMetaCache, key6, strlen(key6)); char* dd = (char*)taosCacheAcquireByKey(tscMetaCache, key6, strlen(key6));
if (dd != NULL) { if (dd != NULL) {
// printf("get the data\n"); // printf("get the data\n");
} else { } else {
printf("data has been released\n"); printf("data has been released\n");
} }
taosCacheRelease(tscMetaCache, (void**) &dd, false); taosCacheRelease(tscMetaCache, (void**)&dd, false);
} }
uint64_t endTime = taosGetTimestampUs(); uint64_t endTime = taosGetTimestampUs();
int64_t el = endTime - startTime; int64_t el = endTime - startTime;
printf("End of Test, %" PRIu64 "\nTotal Elapsed Time:%" PRIu64 " us.avg:%f us\n", endTime, el, el/1000.0); printf("End of Test, %" PRIu64 "\nTotal Elapsed Time:%" PRIu64 " us.avg:%f us\n", endTime, el, el / 1000.0);
taosCacheCleanup(tscMetaCache); taosCacheCleanup(tscMetaCache);
} }
TEST(cacheTest, cache_iter_test) { TEST(cacheTest, cache_iter_test) {
const int32_t REFRESH_TIME_IN_SEC = 2; const int32_t REFRESH_TIME_IN_SEC = 2;
auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC, false, NULL, "test"); auto* pCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, REFRESH_TIME_IN_SEC * 1000, false, NULL, "test");
char key[256] = {0}; char key[256] = {0};
char data[1024] = "abcdefghijk"; char data[1024] = "abcdefghijk";
// int32_t len = strlen(data); // int32_t len = strlen(data);
uint64_t startTime = taosGetTimestampUs(); uint64_t startTime = taosGetTimestampUs();
int32_t num = 10000; int32_t num = 10000;
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
int32_t len = sprintf(key, "abc_%7d", i); int32_t len = sprintf(key, "abc_%7d", i);
taosCachePut(pCache, key, strlen(key), data, len, 3600); taosCachePut(pCache, key, strlen(key), data, len, 3600);
} }
uint64_t endTime = taosGetTimestampUs(); uint64_t endTime = taosGetTimestampUs();
printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime, (endTime-startTime)/(double)num); printf("add %d object cost:%" PRIu64 " us, avg:%f us\n", num, endTime - startTime,
(endTime - startTime) / (double)num);
startTime = taosGetTimestampUs(); startTime = taosGetTimestampUs();
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
int32_t len = sprintf(key, "abc_%7d", i); int32_t len = sprintf(key, "abc_%7d", i);
void* k = taosCacheAcquireByKey(pCache, key, len); void* k = taosCacheAcquireByKey(pCache, key, len);
assert(k != 0); assert(k != 0);
} }
endTime = taosGetTimestampUs(); endTime = taosGetTimestampUs();
printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime, (endTime - startTime)/(double)num); printf("retrieve %d object cost:%" PRIu64 " us,avg:%f\n", num, endTime - startTime,
(endTime - startTime) / (double)num);
int32_t count = 0; int32_t count = 0;
SCacheIter* pIter = taosCacheCreateIter(pCache); SCacheIter* pIter = taosCacheCreateIter(pCache);
while(taosCacheIterNext(pIter)) { while (taosCacheIterNext(pIter)) {
size_t keyLen = 0; size_t keyLen = 0;
size_t dataLen = 0; size_t dataLen = 0;
char* key1 = static_cast<char*>(taosCacheIterGetKey(pIter, &keyLen)); char* key1 = static_cast<char*>(taosCacheIterGetKey(pIter, &keyLen));
char* data1 = static_cast<char*>(taosCacheIterGetData(pIter, &dataLen)); char* data1 = static_cast<char*>(taosCacheIterGetData(pIter, &dataLen));
// char d[256] = {0}; // char d[256] = {0};
// memcpy(d, data1, dataLen); // memcpy(d, data1, dataLen);
// char k[256] = {0}; // char k[256] = {0};
// memcpy(k, key1, keyLen); // memcpy(k, key1, keyLen);
} }
ASSERT_EQ(count, num); ASSERT_EQ(count, num);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册