提交 b286a296 编写于 作者: S slguan

[TD-52] first version mpeer

上级 1fb5a8c2
......@@ -31,6 +31,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(taosd balance sync)
ENDIF ()
IF (TD_MPEER)
TARGET_LINK_LIBRARIES(taosd mpeer sync)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
......
......@@ -24,6 +24,7 @@ int32_t dnodeInitMgmt();
void dnodeCleanupMgmt();
void dnodeMgmt(SRpcMsg *rpcMsg);
void dnodeUpdateDnodeId(int32_t dnodeId);
int32_t dnodeGetDnodeId();
void* dnodeGetVnode(int32_t vgId);
int32_t dnodeGetVnodeStatus(void *pVnode);
......
......@@ -294,4 +294,4 @@ uint32_t dnodeGetMnodeMasteIp() {
void* dnodeGetMpeerInfos() {
return &tsMnodeInfos;
}
\ No newline at end of file
}
......@@ -299,3 +299,7 @@ void dnodeUpdateDnodeId(int32_t dnodeId) {
dnodeSaveDnodeId();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeId;
}
\ No newline at end of file
......@@ -45,7 +45,6 @@ struct _mnode_obj;
typedef struct _mnode_obj {
int32_t mnodeId;
int32_t dnodeId;
int64_t createdTime;
int8_t reserved[14];
int8_t updateEnd[1];
......
......@@ -45,6 +45,11 @@ void mpeerGetMpeerInfos(void *mpeers);
char * mpeerGetMnodeStatusStr(int32_t status);
char * mpeerGetMnodeRoleStr(int32_t role);
int32_t mpeerAddMnode(int32_t dnodeId);
int32_t mpeerRemoveMnode(int32_t dnodeId);
int32_t sdbForwardDbReqToPeer(void *pHead);
#ifdef __cplusplus
}
#endif
......
......@@ -34,6 +34,7 @@ typedef enum {
typedef enum {
SDB_KEY_STRING,
SDB_KEY_INT,
SDB_KEY_AUTO
} ESdbKeyType;
......@@ -66,8 +67,19 @@ typedef struct {
int32_t (*updateAllFp)();
} SSdbTableDesc;
typedef struct {
int32_t code;
int64_t version;
void * sync;
void * wal;
sem_t sem;
pthread_mutex_t mutex;
} SSdbObject;
int32_t sdbInit();
void sdbCleanUp();
SSdbObject *sdbGetObj();
int sdbProcessWrite(void *param, void *data, int type);
void * sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle);
......
......@@ -109,6 +109,11 @@ int32_t mgmtStartSystem() {
return -1;
}
if (mpeerInit() < 0) {
mError("failed to init mpeers");
return -1;
}
if (sdbInit() < 0) {
mError("failed to init sdb");
return -1;
......@@ -122,11 +127,6 @@ int32_t mgmtStartSystem() {
return -1;
}
if (mpeerInit() < 0) {
mError("failed to init mpeers");
return -1;
}
if (balanceInit() < 0) {
mError("failed to init dnode balance")
}
......
......@@ -79,7 +79,7 @@ void mpeerGetMpeerInfos(void *param) {
strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName);
}
void mpeerCleanupDnodes() {}
void mpeerCleanupMnodes() {}
int32_t mpeerGetMnodesNum() { return 1; }
void mpeerReleaseMnode(struct _mnode_obj *pMnode) {}
bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; }
......@@ -91,12 +91,11 @@ bool mpeerCheckRedirect() { return false; }
int32_t mpeerInit() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes);
return mpeerInitMnodes();
}
void mpeerCleanup() {
mpeerCleanupDnodes();
mpeerCleanupMnodes();
}
char *mpeerGetMnodeStatusStr(int32_t status) {
......
......@@ -24,19 +24,11 @@
#include "tutil.h"
#include "twal.h"
#include "tsync.h"
#include "mpeer.h"
#include "hashint.h"
#include "hashstr.h"
#include "mgmtSdb.h"
typedef struct {
int32_t code;
int64_t version;
void * sync;
void * wal;
sem_t sem;
pthread_mutex_t mutex;
} SSdbSync;
typedef struct _SSdbTable {
char tableName[TSDB_DB_NAME_LEN + 1];
ESdbTable tableId;
......@@ -70,17 +62,16 @@ typedef enum {
static SSdbTable *tsSdbTableList[SDB_TABLE_MAX] = {0};
static int32_t tsSdbNumOfTables = 0;
static SSdbSync * tsSdbSync;
static SSdbObject * tsSdbObj;
static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash};
static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash};
static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash};
static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData};
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData};
static int sdbProcessWrite(void *param, void *data, int type);
static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash};
static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash, sdbAddIntHash};
static void (*sdbDeleteIndexFp[])(void *handle, void *key) = {sdbDeleteStrHash, sdbDeleteIntHash, sdbDeleteIntHash};
static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, sdbGetIntHashData, sdbGetIntHashData};
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
uint64_t sdbGetVersion() { return tsSdbSync->version; }
uint64_t sdbGetVersion() { return tsSdbObj->version; }
int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
......@@ -101,6 +92,7 @@ static char *sdbGetkeyStr(SSdbTable *pTable, void *row) {
switch (pTable->keyType) {
case SDB_KEY_STRING:
return (char *)row;
case SDB_KEY_INT:
case SDB_KEY_AUTO:
sprintf(str, "%d", *(int32_t *)row);
return str;
......@@ -113,40 +105,30 @@ static void *sdbGetTableFromId(int32_t tableId) {
return tsSdbTableList[tableId];
}
// static void mpeerConfirmForward(void *ahandle, void *param, int32_t code) {
// sem_post(&tsSdbSync->sem);
// mPrint("mpeerConfirmForward");
// }
static int32_t sdbForwardDbReqToPeer(SWalHead *pHead) {
// int32_t code = syncForwardToPeer(NULL, pHead, NULL);
// if (code < 0) {
// return code;
// }
// sem_wait(&tsSdbSync->sem);
// return tsSdbSync->code;
#ifndef _MPEER
int32_t sdbForwardDbReqToPeer(void *pHead) {
return TSDB_CODE_SUCCESS;
}
#endif
int32_t sdbInit() {
tsSdbSync = calloc(1, sizeof(SSdbSync));
sem_init(&tsSdbSync->sem, 0, 0);
pthread_mutex_init(&tsSdbSync->mutex, NULL);
tsSdbObj = calloc(1, sizeof(SSdbObject));
sem_init(&tsSdbObj->sem, 0, 0);
pthread_mutex_init(&tsSdbObj->mutex, NULL);
SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1};
tsSdbSync->wal = walOpen(tsMnodeDir, &walCfg);
if (tsSdbSync->wal == NULL) {
tsSdbObj->wal = walOpen(tsMnodeDir, &walCfg);
if (tsSdbObj->wal == NULL) {
sdbError("failed to open sdb in %s", tsMnodeDir);
return -1;
}
sdbTrace("open sdb file for read");
walRestore(tsSdbSync->wal, tsSdbSync, sdbProcessWrite);
walRestore(tsSdbObj->wal, tsSdbObj, sdbProcessWrite);
int32_t totalRows = 0;
int32_t numOfTables = 0;
for (int32_t tableId = SDB_TABLE_DNODE; tableId < SDB_TABLE_MAX; ++tableId) {
for (int32_t tableId = SDB_TABLE_MNODE; tableId < SDB_TABLE_MAX; ++tableId) {
SSdbTable *pTable = sdbGetTableFromId(tableId);
if (pTable == NULL) continue;
if (pTable->updateAllFp) {
......@@ -158,20 +140,24 @@ int32_t sdbInit() {
sdbTrace("table:%s, is initialized, numOfRows:%d", pTable->tableName, pTable->numOfRows);
}
sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbSync->version, totalRows, numOfTables);
sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables);
return TSDB_CODE_SUCCESS;
}
void sdbCleanUp() {
if (tsSdbSync) {
sem_destroy(&tsSdbSync->sem);
pthread_mutex_destroy(&tsSdbSync->mutex);
walClose(tsSdbSync->wal);
free(tsSdbSync);
tsSdbSync = NULL;
if (tsSdbObj) {
sem_destroy(&tsSdbObj->sem);
pthread_mutex_destroy(&tsSdbObj->mutex);
walClose(tsSdbObj->wal);
free(tsSdbObj);
tsSdbObj = NULL;
}
}
SSdbObject *sdbGetObj() {
return tsSdbObj;
}
void sdbIncRef(void *handle, void *pRow) {
if (pRow) {
SSdbTable *pTable = handle;
......@@ -278,20 +264,20 @@ static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
int32_t code = 0;
pthread_mutex_lock(&tsSdbSync->mutex);
tsSdbSync->version++;
pHead->version = tsSdbSync->version;
pthread_mutex_lock(&tsSdbObj->mutex);
tsSdbObj->version++;
pHead->version = tsSdbObj->version;
code = sdbForwardDbReqToPeer(pHead);
if (code != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&tsSdbSync->mutex);
pthread_mutex_unlock(&tsSdbObj->mutex);
sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
return code;
}
code = walWrite(tsSdbSync->wal, pHead);
pthread_mutex_unlock(&tsSdbSync->mutex);
code = walWrite(tsSdbObj->wal, pHead);
pthread_mutex_unlock(&tsSdbObj->mutex);
if (code < 0) {
sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName,
......@@ -301,26 +287,26 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_
sdbGetkeyStr(pTable, pHead->cont), pHead->version);
}
walFsync(tsSdbSync->wal);
walFsync(tsSdbObj->wal);
free(pHead);
return code;
}
static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
pthread_mutex_lock(&tsSdbSync->mutex);
if (pHead->version <= tsSdbSync->version) {
pthread_mutex_unlock(&tsSdbSync->mutex);
pthread_mutex_lock(&tsSdbObj->mutex);
if (pHead->version <= tsSdbObj->version) {
pthread_mutex_unlock(&tsSdbObj->mutex);
return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbSync->version + 1) {
pthread_mutex_unlock(&tsSdbSync->mutex);
} else if (pHead->version != tsSdbObj->version + 1) {
pthread_mutex_unlock(&tsSdbObj->mutex);
sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version,
tsSdbSync->version);
tsSdbObj->version);
return TSDB_CODE_OTHERS;
}
tsSdbSync->version = pHead->version;
tsSdbObj->version = pHead->version;
sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
......@@ -335,7 +321,7 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_
if (code < 0) {
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
pthread_mutex_unlock(&tsSdbSync->mutex);
pthread_mutex_unlock(&tsSdbObj->mutex);
return code;
}
......@@ -369,17 +355,17 @@ static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_
if (code < 0) {
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
pthread_mutex_unlock(&tsSdbSync->mutex);
pthread_mutex_unlock(&tsSdbObj->mutex);
return code;
}
code = sdbInsertLocal(pTable, &oper2);
}
pthread_mutex_unlock(&tsSdbSync->mutex);
pthread_mutex_unlock(&tsSdbObj->mutex);
return code;
}
static int sdbProcessWrite(void *param, void *data, int type) {
int sdbProcessWrite(void *param, void *data, int type) {
SWalHead *pHead = data;
int32_t tableId = pHead->msgType / 10;
int32_t action = pHead->msgType % 10;
......@@ -426,7 +412,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
(*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize;
int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType);
int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
if (code < 0) return code;
}
......@@ -453,6 +439,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
case SDB_KEY_STRING:
rowSize = strlen((char *)pOper->pObj) + 1;
break;
case SDB_KEY_INT:
case SDB_KEY_AUTO:
rowSize = sizeof(uint64_t);
break;
......@@ -467,7 +454,7 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
memcpy(pHead->cont, pOper->pObj, rowSize);
int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType);
int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
if (code < 0) return code;
}
......@@ -497,7 +484,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
(*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize;
int32_t code = sdbProcessWrite(tsSdbSync, pHead, pHead->msgType);
int32_t code = sdbProcessWrite(tsSdbObj, pHead, pHead->msgType);
if (code < 0) return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册