diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index bb111d2da0da75e6a3e3812ac21364f9a18fb6f3..7ef91be3d92e536a0a7fe2a927ee7d499e1612ff 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -28,7 +28,7 @@ extern "C" { #else #define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code)); #endif - + #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) #define TAOS_SUCCEEDED(err) ((err) >= 0) #define TAOS_FAILED(err) ((err) < 0) @@ -37,7 +37,7 @@ const char* tstrerror(int32_t err); int32_t* taosGetErrno(); #define terrno (*taosGetErrno()) - + #define TSDB_CODE_SUCCESS 0 #ifdef TAOS_ERROR_C @@ -74,6 +74,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "Memory cor TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "Invalid config message") TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "Data file corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, 0, 0x0105, "Ref out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, 0, 0x0106, "too many Ref Objs") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, 0, 0x0107, "Ref ID is removed") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, 0, 0x0108, "Invalid Ref ID") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, 0, 0x0119, "Ref is already there") +TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, 0, 0x0110, "Ref is not there") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "Invalid SQL statement") @@ -182,7 +188,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length") -// vnode +// vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_VND_MSG_NOT_PROCESSED, 0, 0x0501, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_NEED_REPROCESSED, 0, 0x0502, "Action need to be reprocessed") @@ -354,23 +360,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, 0, 0x11A4, "tag value TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string") - -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2101, "out of memory") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_TRUNC, 0, 0x2103, "convertion truncated") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_NOT_SUPPORT, 0, 0x2104, "convertion not supported") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OUT_OF_RANGE, 0, 0x2105, "out of range") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_NOT_SUPPORT, 0, 0x2106, "not supported yet") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_INVALID_HANDLE, 0, 0x2107, "invalid handle") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_NO_RESULT, 0, 0x2108, "no result set") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_NO_FIELDS, 0, 0x2109, "no fields returned") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_INVALID_CURSOR, 0, 0x2110, "invalid cursor") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_STATEMENT_NOT_READY, 0, 0x2111, "statement not ready") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONNECTION_BUSY, 0, 0x2112, "connection still busy") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_BAD_CONNSTR, 0, 0x2113, "bad connection string") -TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_BAD_ARG, 0, 0x2114, "bad argument") - - #ifdef TAOS_ERROR_C }; #endif diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h new file mode 100644 index 0000000000000000000000000000000000000000..6619ff407e2aad2322e952ce3ef43c87eba64bb2 --- /dev/null +++ b/src/util/inc/tref.h @@ -0,0 +1,38 @@ + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TREF_H +#define TDENGINE_TREF_H + +#ifdef __cplusplus +extern "C" { +#endif + +int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs +void taosCloseRef(int refId); +int taosListRef(); // return the number of references in system +int taosAddRef(int refId, void *p); +int taosAcquireRef(int refId, void *p); +void taosReleaseRef(int refId, void *p); + +#define taosRemoveRef taosReleaseRef + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TREF_H diff --git a/src/util/src/tref.c b/src/util/src/tref.c new file mode 100644 index 0000000000000000000000000000000000000000..5e0a85fdb6f64cb47a31c70369acfc88fa815a5f --- /dev/null +++ b/src/util/src/tref.c @@ -0,0 +1,402 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taoserror.h" +#include "tulog.h" +#include "tutil.h" + +#define TSDB_REF_OBJECTS 50 +#define TSDB_REF_STATE_EMPTY 0 +#define TSDB_REF_STATE_ACTIVE 1 +#define TSDB_REF_STATE_DELETED 2 + +typedef struct SRefNode { + struct SRefNode *prev; + struct SRefNode *next; + void *p; + int32_t count; +} SRefNode; + +typedef struct { + SRefNode **nodeList; + int state; // 0: empty, 1: active; 2: deleted + int refId; + int max; + int32_t count; // total number of SRefNodes in this set + int64_t *lockedBy; + void (*fp)(void *); +} SRefSet; + +static SRefSet tsRefSetList[TSDB_REF_OBJECTS]; +static pthread_once_t tsRefModuleInit = PTHREAD_ONCE_INIT; +static pthread_mutex_t tsRefMutex; +static int tsRefSetNum = 0; +static int tsNextId = 0; + +static void taosInitRefModule(void); +static int taosHashRef(SRefSet *pSet, void *p); +static void taosLockList(int64_t *lockedBy); +static void taosUnlockList(int64_t *lockedBy); +static void taosIncRefCount(SRefSet *pSet); +static void taosDecRefCount(SRefSet *pSet); + +int taosOpenRef(int max, void (*fp)(void *)) +{ + SRefNode **nodeList; + SRefSet *pSet; + int64_t *lockedBy; + int i, refId; + + pthread_once(&tsRefModuleInit, taosInitRefModule); + + nodeList = calloc(sizeof(SRefNode *), (size_t)max); + if (nodeList == NULL) { + return TSDB_CODE_REF_NO_MEMORY; + } + + lockedBy = calloc(sizeof(int64_t), (size_t)max); + if (lockedBy == NULL) { + free(nodeList); + return TSDB_CODE_REF_NO_MEMORY; + } + + pthread_mutex_lock(&tsRefMutex); + + for (i = 0; i < TSDB_REF_OBJECTS; ++i) { + tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; + if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; + } + + if (i < TSDB_REF_OBJECTS) { + refId = tsNextId; + pSet = tsRefSetList + refId; + taosIncRefCount(pSet); + pSet->max = max; + pSet->nodeList = nodeList; + pSet->lockedBy = lockedBy; + pSet->fp = fp; + pSet->state = TSDB_REF_STATE_ACTIVE; + pSet->refId = refId; + + tsRefSetNum++; + uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); + } else { + refId = TSDB_CODE_REF_FULL; + free (nodeList); + free (lockedBy); + uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); + } + + pthread_mutex_unlock(&tsRefMutex); + + return refId; +} + +void taosCloseRef(int refId) +{ + SRefSet *pSet; + int deleted = 0; + + if (refId < 0 || refId >= TSDB_REF_OBJECTS) { + uTrace("refId:%d is invalid, out of range", refId); + return; + } + + pSet = tsRefSetList + refId; + + pthread_mutex_lock(&tsRefMutex); + + if (pSet->state == TSDB_REF_STATE_ACTIVE) { + pSet->state = TSDB_REF_STATE_DELETED; + deleted = 1; + uTrace("refId:%d is closed, count:%d", refId, pSet->count); + } else { + uTrace("refId:%d is already closed, count:%d", refId, pSet->count); + } + + pthread_mutex_unlock(&tsRefMutex); + + if (deleted) taosDecRefCount(pSet); +} + +int taosAddRef(int refId, void *p) +{ + int hash; + SRefNode *pNode; + SRefSet *pSet; + + if (refId < 0 || refId >= TSDB_REF_OBJECTS) { + uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); + return TSDB_CODE_REF_INVALID_ID; + } + + uTrace("refId:%d p:%p try to add", refId, p); + + pSet = tsRefSetList + refId; + taosIncRefCount(pSet); + if (pSet->state != TSDB_REF_STATE_ACTIVE) { + taosDecRefCount(pSet); + uTrace("refId:%d p:%p failed to add, not active", refId, p); + return TSDB_CODE_REF_ID_REMOVED; + } + + int code = 0; + hash = taosHashRef(pSet, p); + + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while ( pNode ) { + if ( pNode->p == p ) + break; + + pNode = pNode->next; + } + + if (pNode) { + code = TSDB_CODE_REF_ALREADY_EXIST; + uTrace("refId:%d p:%p is already there, faild to add", refId, p); + } else { + pNode = calloc(sizeof(SRefNode), 1); + if (pNode) { + pNode->p = p; + pNode->count = 1; + pNode->prev = 0; + pNode->next = pSet->nodeList[hash]; + pSet->nodeList[hash] = pNode; + uTrace("refId:%d p:%p is added, count::%d", refId, p, pSet->count); + } else { + code = TSDB_CODE_REF_NO_MEMORY; + uTrace("refId:%d p:%p is not added, since no memory", refId, p); + } + } + + if (code < 0) taosDecRefCount(pSet); + + taosUnlockList(pSet->lockedBy+hash); + + return code; +} + +int taosAcquireRef(int refId, void *p) +{ + int hash, code = 0; + SRefNode *pNode; + SRefSet *pSet; + + if ( refId < 0 || refId >= TSDB_REF_OBJECTS ) { + uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); + return TSDB_CODE_REF_INVALID_ID; + } + + uTrace("refId:%d p:%p try to acquire", refId, p); + + pSet = tsRefSetList + refId; + taosIncRefCount(pSet); + if (pSet->state != TSDB_REF_STATE_ACTIVE) { + uTrace("refId:%d p:%p failed to acquire, not active", refId, p); + taosDecRefCount(pSet); + return TSDB_CODE_REF_ID_REMOVED; + } + + hash = taosHashRef(pSet, p); + + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + + while (pNode) { + if (pNode->p == p) { + break; + } + + pNode = pNode->next; + } + + if (pNode) { + pNode->count++; + uTrace("refId:%d p:%p is acquired", refId, p); + } else { + code = TSDB_CODE_REF_NOT_EXIST; + uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); + } + + taosUnlockList(pSet->lockedBy+hash); + + taosDecRefCount(pSet); + + return code; +} + +void taosReleaseRef(int refId, void *p) +{ + int hash; + SRefNode *pNode; + SRefSet *pSet; + int released = 0; + + if (refId < 0 || refId >= TSDB_REF_OBJECTS) { + uTrace("refId:%d p:%p failed to release, refId not valid", refId, p); + return; + } + + uTrace("refId:%d p:%p try to release", refId, p); + + pSet = tsRefSetList + refId; + if (pSet->state == TSDB_REF_STATE_EMPTY) { + uTrace("refId:%d p:%p failed to release, cleaned", refId, p); + return; + } + + hash = taosHashRef(pSet, p); + + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if ( pNode->p == p ) + break; + + pNode = pNode->next; + } + + if (pNode) { + pNode->count--; + + if (pNode->count == 0) { + if ( pNode->prev ) { + pNode->prev->next = pNode->next; + } else { + pSet->nodeList[hash] = pNode->next; + } + + if ( pNode->next ) { + pNode->next->prev = pNode->prev; + } + + (*pSet->fp)(pNode->p); + + free(pNode); + released = 1; + uTrace("refId:%d p:%p is removed, count::%d", refId, p, pSet->count); + } else { + uTrace("refId:%d p:%p is released", refId, p); + } + } else { + uTrace("refId:%d p:%p is not there, failed to release", refId, p); + } + + taosUnlockList(pSet->lockedBy+hash); + + if (released) taosDecRefCount(pSet); +} + +int taosListRef() { + SRefSet *pSet; + SRefNode *pNode; + int num = 0; + + pthread_mutex_lock(&tsRefMutex); + + for (int i = 0; i < TSDB_REF_OBJECTS; ++i) { + pSet = tsRefSetList + i; + + if (pSet->state == TSDB_REF_STATE_EMPTY) + continue; + + uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); + + for (int j=0; j < pSet->max; ++j) { + pNode = pSet->nodeList[j]; + + while (pNode) { + uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); + pNode = pNode->next; + num++; + } + } + } + + pthread_mutex_unlock(&tsRefMutex); + + return num; +} + +static int taosHashRef(SRefSet *pSet, void *p) +{ + int hash = 0; + int64_t v = (int64_t)p; + + for (int i = 0; i < sizeof(v); ++i) { + hash += (int)(v & 0xFFFF); + v = v >> 16; + i = i + 2; + } + + hash = hash % pSet->max; + + return hash; +} + +static void taosLockList(int64_t *lockedBy) { + int64_t tid = taosGetPthreadId(); + int i = 0; + while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { + if (++i % 100 == 0) { + sched_yield(); + } + } +} + +static void taosUnlockList(int64_t *lockedBy) { + int64_t tid = taosGetPthreadId(); + if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { + assert(false); + } +} + +static void taosInitRefModule(void) { + pthread_mutex_init(&tsRefMutex, NULL); +} + +static void taosIncRefCount(SRefSet *pSet) { + atomic_add_fetch_32(&pSet->count, 1); + uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); +} + +static void taosDecRefCount(SRefSet *pSet) { + int32_t count = atomic_sub_fetch_32(&pSet->count, 1); + uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); + + if (count > 0) return; + + pthread_mutex_lock(&tsRefMutex); + + if (pSet->state != TSDB_REF_STATE_EMPTY) { + pSet->state = TSDB_REF_STATE_EMPTY; + pSet->max = 0; + pSet->fp = NULL; + + taosTFree(pSet->nodeList); + taosTFree(pSet->lockedBy); + + tsRefSetNum--; + uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); + } + + pthread_mutex_unlock(&tsRefMutex); +} + diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c new file mode 100644 index 0000000000000000000000000000000000000000..486f9f6d6d8cbde6f130aebc96efb209c5b26997 --- /dev/null +++ b/src/util/tests/trefTest.c @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include +#include "os.h" +#include "tref.h" +#include "tlog.h" +#include "tglobal.h" +#include "taoserror.h" +#include "tulog.h" + +typedef struct { + int refNum; + int steps; + int refId; + void **p; +} SRefSpace; + +void *takeRefActions(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int code, id; + + for (int i=0; i < pSpace->steps; ++i) { + printf("s"); + id = random() % pSpace->refNum; + code = taosAddRef(pSpace->refId, pSpace->p[id]); + usleep(1); + + id = random() % pSpace->refNum; + code = taosAcquireRef(pSpace->refId, pSpace->p[id]); + if (code >= 0) { + usleep(id % 5 + 1); + taosReleaseRef(pSpace->refId, pSpace->p[id]); + } + + id = random() % pSpace->refNum; + taosRemoveRef(pSpace->refId, pSpace->p[id]); + usleep(id %5 + 1); + + id = random() % pSpace->refNum; + code = taosAcquireRef(pSpace->refId, pSpace->p[id]); + if (code >= 0) { + usleep(id % 5 + 1); + taosReleaseRef(pSpace->refId, pSpace->p[id]); + } + } + + for (int i=0; i < pSpace->refNum; ++i) { + taosRemoveRef(pSpace->refId, pSpace->p[i]); + } + + //uInfo("refId:%d thread exits", pSpace->refId); + + return NULL; +} + +void myfree(void *p) { + return; +} + +void *openRefSpace(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + + printf("c"); + pSpace->refId = taosOpenRef(10000, myfree); + + if (pSpace->refId < 0) { + printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); + return NULL; + } + + pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); + for (int i=0; irefNum; ++i) { + pSpace->p[i] = (void *) malloc(128); + } + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + pthread_t thread1, thread2, thread3; + pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); + pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); + pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); + + pthread_join(thread1, NULL); + pthread_join(thread2, NULL); + pthread_join(thread3, NULL); + + taosCloseRef(pSpace->refId); + + for (int i=0; irefNum; ++i) { + free(pSpace->p[i]); + } + + uInfo("refId:%d main thread exit", pSpace->refId); + free(pSpace->p); + pSpace->p = NULL; + + return NULL; +} + +int main(int argc, char *argv[]) { + int refNum = 100; + int threads = 10; + int steps = 10000; + int loops = 1; + + uDebugFlag = 143; + + for (int i=1; i