提交 a7d9dae7 编写于 作者: S Shengliang Guan

Merge branch 'feature/wal' of https://github.com/taosdata/TDengine into feature/wal

...@@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) { ...@@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
atomic_add_fetch_32(&pStable->numOfTables, 1); atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) { if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
mDebug("table:%s, create hash:%p", pStable->info.tableId, pStable->vgHash);
} }
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) { if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("table:%s, vgId:%d is put into stable hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash)); pStable->vgHash, taosHashGetSize(pStable->vgHash));
} }
} }
} }
...@@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable) ...@@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)); taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, mDebug("table:%s, vgId:%d is remove from stable hash:%p sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
(int32_t)taosHashGetSize(pStable->vgHash)); pStable->vgHash, taosHashGetSize(pStable->vgHash));
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
} }
static void mnodeDestroySuperTable(SSTableObj *pStable) { static void mnodeDestroySuperTable(SSTableObj *pStable) {
mDebug("table:%s, is destroyed, stable hash:%p", pStable->info.tableId, pStable->vgHash);
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
taosHashCleanup(pStable->vgHash); taosHashCleanup(pStable->vgHash);
pStable->vgHash = NULL; pStable->vgHash = NULL;
...@@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { ...@@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
SSTableObj *pNew = pRow->pObj; SSTableObj *pNew = pRow->pObj;
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
if (pTable != NULL && pTable != pNew) { if (pTable != NULL && pTable != pNew) {
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
void *oldTableId = pTable->info.tableId; void *oldTableId = pTable->info.tableId;
void *oldSchema = pTable->schema; void *oldSchema = pTable->schema;
void *oldVgHash = pTable->vgHash; void *oldVgHash = pTable->vgHash;
...@@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { ...@@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
free(pNew); free(pNew);
free(oldTableId); free(oldTableId);
free(oldSchema); free(oldSchema);
mDebug("table:%s, update finished, hash:%p sizeOfVgList:%d", pTable->info.tableId, pTable->vgHash,
taosHashGetSize(pTable->vgHash));
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
...@@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { ...@@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable->type == TSDB_SUPER_TABLE) { if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable; SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg,
pMsg, pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash)); pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, taosHashGetSize(pSTable->vgHash));
return mnodeProcessDropSuperTableMsg(pMsg); return mnodeProcessDropSuperTableMsg(pMsg);
} else { } else {
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable; SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
...@@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable; SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) { mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL); int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
while (pVgId) { while (pVgId) {
SVgObj *pVgroup = mnodeGetVgroup(*pVgId); SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
...@@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
pDrop->uid = htobe64(pStable->uid); pDrop->uid = htobe64(pStable->uid);
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId); mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle, mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d, hash:%p sizeOfVgList:%d", pMsg,
pStable->info.tableId, pVgroup->vgId); pMsg->rpcMsg.ahandle, pStable->info.tableId, pVgroup->vgId, pStable->vgHash,
taosHashGetSize(pStable->vgHash));
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
dnodeSendMsgToDnode(&epSet, &rpcMsg); dnodeSendMsgToDnode(&epSet, &rpcMsg);
...@@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { ...@@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pMeta; pMsg->rpcRsp.rsp = pMeta;
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved, sizeOfVgList:%d numOfTables:%d", pMsg,
pTable->info.tableId, pTable->uid); pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->uid, taosHashGetSize(pTable->vgHash), pTable->numOfTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg); char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) { for (int32_t i = 0; i < numOfTable; ++i) {
char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
SSTableObj *pTable = mnodeGetSuperTable(stableName); SSTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) { if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
...@@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { ...@@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
msg += sizeof(SVgroupsMsg); msg += sizeof(SVgroupsMsg);
} else { } else {
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
int32_t vgSize = 0; int32_t vgSize = 0;
......
...@@ -79,7 +79,7 @@ bool httpInitContexts() { ...@@ -79,7 +79,7 @@ bool httpInitContexts() {
void httpCleanupContexts() { void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) { if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache; SCacheObj *cache = tsHttpServer.contextCache;
httpInfo("context cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable)); httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache); taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL; tsHttpServer.contextCache = NULL;
} }
......
...@@ -107,7 +107,7 @@ static void httpDestroySession(void *data) { ...@@ -107,7 +107,7 @@ static void httpDestroySession(void *data) {
void httpCleanUpSessions() { void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) { if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache; SCacheObj *cache = tsHttpServer.sessionCache;
httpInfo("session cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable)); httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache); taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL; tsHttpServer.sessionCache = NULL;
} }
......
...@@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp ...@@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
* @param pHashObj * @param pHashObj
* @return * @return
*/ */
size_t taosHashGetSize(const SHashObj *pHashObj); int32_t taosHashGetSize(const SHashObj *pHashObj);
/** /**
* put element into hash table, if the element with the same key exists, update it * put element into hash table, if the element with the same key exists, update it
......
...@@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp ...@@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return pHashObj; return pHashObj;
} }
size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; } int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)((pHashObj == NULL) ? 0 : pHashObj->size); }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
......
...@@ -24,15 +24,17 @@ ...@@ -24,15 +24,17 @@
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
#define NC "\033[0m" #define NC "\033[0m"
int32_t capacity = 100000; int32_t capacity = 128;
int32_t q1Times = 1; int32_t q1Times = 10;
int32_t q2Times = 1; int32_t q2Times = 10;
int32_t keyNum = 100000; int32_t keyNum = 100000;
int32_t printInterval = 10000; int32_t printInterval = 1000;
void * hashHandle;
pthread_t thread;
typedef struct HashTestRow { typedef struct HashTestRow {
int32_t size; int32_t keySize;
void * ptr; char key[100];
} HashTestRow; } HashTestRow;
void shellParseArgument(int argc, char *argv[]); void shellParseArgument(int argc, char *argv[]);
...@@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]); ...@@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]);
void testHashPerformance() { void testHashPerformance() {
int64_t initialMs = taosGetTimestampMs(); int64_t initialMs = taosGetTimestampMs();
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
void * hashHandle = taosHashInit(capacity, hashFp, true); hashHandle = taosHashInit(128, hashFp, true, HASH_NO_LOCK);
int64_t startMs = taosGetTimestampMs(); int64_t startMs = taosGetTimestampMs();
float seconds = (startMs - initialMs) / 1000.0; float seconds = (startMs - initialMs) / 1000.0;
...@@ -48,17 +50,25 @@ void testHashPerformance() { ...@@ -48,17 +50,25 @@ void testHashPerformance() {
for (int32_t t = 1; t <= keyNum; ++t) { for (int32_t t = 1; t <= keyNum; ++t) {
HashTestRow row = {0}; HashTestRow row = {0};
char key[100] = {0}; row.keySize = sprintf(row.key, "0.db.st%d", t);
int32_t keySize = sprintf(key, "0.db.st%d", t);
for (int32_t q = 0; q < q1Times; q++) { for (int32_t q = 0; q < q1Times; q++) {
taosHashGet(hashHandle, &key, keySize); taosHashGet(hashHandle, row.key, row.keySize);
} }
taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow)); taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow));
for (int32_t q = 0; q < q2Times; q++) { for (int32_t q = 0; q < q2Times; q++) {
taosHashGet(hashHandle, &key, keySize); taosHashGet(hashHandle, row.key, row.keySize);
}
// test iterator
{
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
} }
if (t % printInterval == 0) { if (t % printInterval == 0) {
...@@ -80,9 +90,35 @@ void testHashPerformance() { ...@@ -80,9 +90,35 @@ void testHashPerformance() {
taosHashCleanup(hashHandle); taosHashCleanup(hashHandle);
} }
void *multiThreadFunc(void *param) {
for (int i = 0; i < 100; ++i) {
taosMsleep(1000);
HashTestRow *row = taosHashIterate(hashHandle, NULL);
while (row) {
taosHashGet(hashHandle, row->key, row->keySize);
row = taosHashIterate(hashHandle, row);
}
int64_t hashSize = taosHashGetSize(hashHandle);
pPrint("i:%d hashSize:%ld", i, hashSize);
}
return NULL;
}
void multiThreadTest() {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// Start threads to write
pthread_create(&thread, &thattr, multiThreadFunc, NULL);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
shellParseArgument(argc, argv); shellParseArgument(argc, argv);
multiThreadTest();
testHashPerformance(); testHashPerformance();
pthread_join(thread, NULL);
} }
void printHelp() { void printHelp() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册