diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 7a029b8f10fe6776b7156797d2a91dc50b0464cf..111509677d94caf366852100e84d5c2f1347fcca 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -16,62 +16,130 @@ #ifndef _TD_SDB_H_ #define _TD_SDB_H_ -#include "cJSON.h" - #ifdef __cplusplus extern "C" { #endif +#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \ + { \ + if ((dataLen) >= (valLen)) { \ + memcpy((val), (char *)(pData), (valLen)); \ + (dataLen) -= (valLen); \ + (pData) = (char *)(pData) + (valLen); \ + } else { \ + code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \ + } \ + } + +#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \ + { \ + if (dataLen >= sizeof(int32_t)) { \ + *(int32_t *)(pData) = (int32_t)(val); \ + (dataLen) -= sizeof(int32_t); \ + (pData) = (char *)(pData) + sizeof(int32_t); \ + } else { \ + code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \ + } \ + } + +#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \ + { \ + if (dataLen >= sizeof(int64_t)) { \ + *(int64_t *)(pData) = (int64_t)(val); \ + (dataLen) -= sizeof(int64_t); \ + (pData) = (char *)(pData) + sizeof(int64_t); \ + } else { \ + code = TSDB_CODE_SDB_INVAID_RAW_DATA_LEN; \ + } \ + } + +#define SDB_SET_INT64_VAL(pData, dataLen, val) \ + { \ + *(int64_t *)(pData) = (int64_t)(val); \ + (dataLen) += sizeof(int64_t); \ + (pData) += sizeof(int64_t); \ + } + +#define SDB_SET_INT32_VAL(pData, dataLen, val) \ + { \ + *(int32_t *)(pData) = (int32_t)(val); \ + (dataLen) += sizeof(int32_t); \ + (pData) += sizeof(int32_t); \ + } + +#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \ + { \ + memcpy((char *)(pData), (val), (valLen)); \ + (dataLen) += (valLen); \ + (pData) += (valLen); \ + } + typedef enum { - MN_SDB_START = 0, - MN_SDB_CLUSTER = 1, - MN_SDB_DNODE = 2, - MN_SDB_MNODE = 3, - MN_SDB_ACCT = 4, - MN_SDB_AUTH = 5, - MN_SDB_USER = 6, - MN_SDB_DB = 7, - MN_SDB_VGROUP = 8, - MN_SDB_STABLE = 9, - MN_SDB_FUNC = 10, - MN_SDB_OPER = 11, - MN_SDB_MAX = 12 -} EMnSdb; - -typedef enum { MN_OP_START = 0, MN_OP_INSERT = 1, MN_OP_UPDATE = 2, MN_OP_DELETE = 3, MN_OP_MAX = 4 } EMnOp; - -typedef enum { MN_KEY_START = 0, MN_KEY_BINARY = 1, MN_KEY_INT32 = 2, MN_KEY_INT64 = 3, MN_KEY_MAX } EMnKey; - -typedef enum { MN_SDB_STAT_AVAIL = 0, MN_SDB_STAT_DROPPED = 1 } EMnSdbStat; + SDB_START = 0, + SDB_VERSION = 1, + SDB_CLUSTER = 2, + SDB_DNODE = 3, + SDB_MNODE = 4, + SDB_ACCT = 5, + SDB_AUTH = 6, + SDB_USER = 7, + SDB_DB = 8, + SDB_VGROUP = 9, + SDB_STABLE = 10, + SDB_FUNC = 11, + SDB_OPER = 12, + SDB_MAX = 13 +} ESdbType; + +typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction; +typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; +typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY, SDB_STATUS_DROPPING, SDB_STATUS_DROPPED } ESdbStatus; typedef struct { - int8_t type; - int8_t status; - int8_t align[6]; -} SdbHead; + int8_t type; + int8_t sver; + int8_t status; + int8_t action; + int8_t reserved[4]; + int32_t cksum; + int32_t dataLen; + char data[]; +} SSdbRawData; + +typedef int32_t (*SdbInsertFp)(void *pObj); +typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); +typedef int32_t (*SdbDeleteFp)(void *pObj); +typedef int32_t (*SdbDeployFp)(); +typedef void *(*SdbDecodeFp)(SSdbRawData *pRaw); +typedef SSdbRawData *(*SdbEncodeFp)(void *pObj); -typedef void (*SdbDeployFp)(); -typedef void *(*SdbDecodeFp)(cJSON *root); -typedef int32_t (*SdbEncodeFp)(void *pHead, char *buf, int32_t maxLen); +typedef struct { + ESdbType sdbType; + EKeyType keyType; + SdbDeployFp deployFp; + SdbEncodeFp encodeFp; + SdbDecodeFp decodeFp; + SdbInsertFp insertFp; + SdbUpdateFp updateFp; + SdbDeleteFp deleteFp; +} SSdbDesc; int32_t sdbInit(); void sdbCleanup(); +void sdbSetHandler(SSdbDesc desc); int32_t sdbRead(); +int32_t sdbWrite(SSdbRawData *pRawData); int32_t sdbCommit(); int32_t sdbDeploy(); void sdbUnDeploy(); -void *sdbInsertRow(EMnSdb sdb, void *pObj); -void sdbDeleteRow(EMnSdb sdb, void *pHead); -void *sdbUpdateRow(EMnSdb sdb, void *pHead); -void *sdbGetRow(EMnSdb sdb, void *pKey); -void *sdbFetchRow(EMnSdb sdb, void *pIter); -void sdbCancelFetch(EMnSdb sdb, void *pIter); -int32_t sdbGetCount(EMnSdb sdb); - -void sdbSetFp(EMnSdb, EMnKey, SdbDeployFp, SdbEncodeFp, SdbDecodeFp, int32_t dataSize); +void *sdbAcquire(ESdbType sdb, void *pKey); +void sdbRelease(ESdbType sdb, void *pObj); +void *sdbFetch(ESdbType sdb, void *pIter); +void sdbCancelFetch(ESdbType sdb, void *pIter); +int32_t sdbGetSize(ESdbType sdb); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6c46a4c89fffb10d3c7bafed62ed1b77a2ef181d..ba3dad63aff745459947ab32ca1ef96cf15f62f7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -131,12 +131,15 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") -#define TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) //"Object already there") -#define TSDB_CODE_MND_SDB_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321) //"Unexpected generic error in sdb") -#define TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0322) //"Invalid table type") -#define TSDB_CODE_MND_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323) //"Object not there") -#define TSDB_CODE_MND_SDB_INVAID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0324) //"Invalid meta row") -#define TSDB_CODE_MND_SDB_INVAID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) //"Invalid key type") +#define TSDB_CODE_SDB_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) +#define TSDB_CODE_SDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0321) +#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) +#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323) +#define TSDB_CODE_SDB_INVAID_RAW_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0324) +#define TSDB_CODE_SDB_INVAID_RAW_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0325) +#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) +#define TSDB_CODE_SDB_INVAID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0327) +#define TSDB_CODE_SDB_INVAID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0328) #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists") #define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist") @@ -155,12 +158,12 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED TAOS_DEF_ERROR_CODE(0, 0x033E) //"Dnode Ep not configured") #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) //"Account already exists") -#define TSDB_CODE_MND_INVALID_ACCT TAOS_DEF_ERROR_CODE(0, 0x0341) //"Invalid account") +#define TSDB_CODE_MND_ACCT_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0341) //"Invalid account") #define TSDB_CODE_MND_INVALID_ACCT_OPTION TAOS_DEF_ERROR_CODE(0, 0x0342) //"Invalid account options") #define TSDB_CODE_MND_ACCT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0343) //"Account authorization has expired") #define TSDB_CODE_MND_USER_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0350) //"User already exists") -#define TSDB_CODE_MND_INVALID_USER TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid user") +#define TSDB_CODE_MND_USER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0351) //"Invalid user") #define TSDB_CODE_MND_INVALID_USER_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0352) //"Invalid user format") #define TSDB_CODE_MND_INVALID_PASS_FORMAT TAOS_DEF_ERROR_CODE(0, 0x0353) //"Invalid password format") #define TSDB_CODE_MND_NO_USER_FROM_CONN TAOS_DEF_ERROR_CODE(0, 0x0354) //"Can not get user from conn") diff --git a/source/dnode/mnode/impl/inc/mnodeDef.h b/source/dnode/mnode/impl/inc/mnodeDef.h index d3ddba332e2fe2bdf1f0bb9c69ce0df7fbef0cbb..973e90656fddcb7557e863105bc2a84cff4c4a1d 100644 --- a/source/dnode/mnode/impl/inc/mnodeDef.h +++ b/source/dnode/mnode/impl/inc/mnodeDef.h @@ -81,7 +81,6 @@ typedef enum { typedef struct SClusterObj { - SdbHead head; int64_t id; char uid[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; @@ -89,7 +88,6 @@ typedef struct SClusterObj { } SClusterObj; typedef struct SDnodeObj { - SdbHead head; int32_t id; int32_t vnodes; int64_t createdTime; @@ -106,7 +104,6 @@ typedef struct SDnodeObj { } SDnodeObj; typedef struct SMnodeObj { - SdbHead head; int32_t id; int8_t status; int8_t role; @@ -122,8 +119,8 @@ typedef struct { int32_t maxDbs; int32_t maxTimeSeries; int32_t maxStreams; - int64_t maxStorage; // In unit of GB - int8_t accessState; // Configured only by command + int64_t maxStorage; // In unit of GB + int32_t accessState; // Configured only by command } SAcctCfg; typedef struct { @@ -136,18 +133,16 @@ typedef struct { } SAcctInfo; typedef struct SAcctObj { - SdbHead head; char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; int32_t acctId; - int8_t status; + int32_t status; SAcctCfg cfg; SAcctInfo info; } SAcctObj; typedef struct SUserObj { - SdbHead head; char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; char acct[TSDB_USER_LEN]; @@ -182,7 +177,6 @@ typedef struct { } SDbCfg; typedef struct SDbObj { - SdbHead head; char name[TSDB_FULL_DB_NAME_LEN]; char acct[TSDB_USER_LEN]; int64_t createdTime; @@ -226,7 +220,6 @@ typedef struct SVgObj { } SVgObj; typedef struct SSTableObj { - SdbHead head; char tableId[TSDB_TABLE_NAME_LEN]; uint64_t uid; int64_t createdTime; @@ -237,7 +230,6 @@ typedef struct SSTableObj { } SSTableObj; typedef struct SFuncObj { - SdbHead head; char name[TSDB_FUNC_NAME_LEN]; char path[128]; int32_t contLen; diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index ae9d666da4a0392909ea84691d51a50ab8b7f1f1..0811a64e926ed8f856f7d6b45c67e437f88c2bb7 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -14,133 +14,117 @@ */ #define _DEFAULT_SOURCE -#include "os.h" #include "mnodeInt.h" -static void mnodeCreateDefaultAcct() { - int32_t code = TSDB_CODE_SUCCESS; +#define ACCT_VER 1 - SAcctObj acctObj = {0}; - tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN); - acctObj.cfg = (SAcctCfg){.maxUsers = 128, - .maxDbs = 128, - .maxTimeSeries = INT32_MAX, - .maxStreams = 1000, - .maxStorage = INT64_MAX, - .accessState = TSDB_VN_ALL_ACCCESS}; - acctObj.acctId = 1; - acctObj.createdTime = taosGetTimestampMs(); - acctObj.updateTime = taosGetTimestampMs(); +static SSdbRawData *mnodeAcctActionEncode(SAcctObj *pAcct) { + SSdbRawData *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRawData)); + if (pRaw == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } - sdbInsertRow(MN_SDB_ACCT, &acctObj); + int32_t dataLen = 0; + char *pData = pRaw->data; + SDB_SET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN) + SDB_SET_INT64_VAL(pData, dataLen, pAcct->createdTime) + SDB_SET_INT64_VAL(pData, dataLen, pAcct->updateTime) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->acctId) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->status) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams) + SDB_SET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage) + SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState) + + pRaw->dataLen = dataLen; + pRaw->type = SDB_ACCT; + pRaw->sver = ACCT_VER; + return pRaw; } -int32_t mnodeEncodeAcct(SAcctObj *pAcct, char *buf, int32_t maxLen) { - int32_t len = 0; - - len += snprintf(buf + len, maxLen - len, "{\"type\":%d, ", MN_SDB_ACCT); - len += snprintf(buf + len, maxLen - len, "\"acct\":\"%s\", ", pAcct->acct); - len += snprintf(buf + len, maxLen - len, "\"acctId\":\"%d\", ", pAcct->acctId); - len += snprintf(buf + len, maxLen - len, "\"maxUsers\":\"%d\", ", pAcct->cfg.maxUsers); - len += snprintf(buf + len, maxLen - len, "\"maxDbs\":\"%d\", ", pAcct->cfg.maxDbs); - len += snprintf(buf + len, maxLen - len, "\"maxTimeSeries\":\"%d\", ", pAcct->cfg.maxTimeSeries); - len += snprintf(buf + len, maxLen - len, "\"maxStreams\":\"%d\", ", pAcct->cfg.maxStreams); - len += snprintf(buf + len, maxLen - len, "\"maxStorage\":\"%" PRIu64 "\", ", pAcct->cfg.maxStorage); - len += snprintf(buf + len, maxLen - len, "\"accessState\":\"%d\", ", pAcct->cfg.accessState); - len += snprintf(buf + len, maxLen - len, "\"createdTime\":\"%" PRIu64 "\", ", pAcct->createdTime); - len += snprintf(buf + len, maxLen - len, "\"updateTime\":\"%" PRIu64 "\"}\n", pAcct->updateTime); - - return len; -} +static SAcctObj *mnodeAcctActionDecode(SSdbRawData *pRaw) { + if (pRaw->sver != ACCT_VER) { + terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; + return NULL; + } -SAcctObj *mnodeDecodeAcct(cJSON *root) { - int32_t code = -1; SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); - - cJSON *acct = cJSON_GetObjectItem(root, "acct"); - if (!acct || acct->type != cJSON_String) { - mError("failed to parse acct since acct not found"); - goto DECODE_ACCT_OVER; + if (pAcct == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; } - tstrncpy(pAcct->acct, acct->valuestring, TSDB_USER_LEN); - cJSON *acctId = cJSON_GetObjectItem(root, "acctId"); - if (!acctId || acctId->type != cJSON_String) { - mError("acct:%s, failed to parse since acctId not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->acctId = atol(acctId->valuestring); + int32_t code = 0; + int32_t dataLen = pRaw->dataLen; + char *pData = pRaw->data; + SDB_GET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN, code) + SDB_GET_INT64_VAL(pData, dataLen, pAcct->createdTime, code) + SDB_GET_INT64_VAL(pData, dataLen, pAcct->updateTime, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->acctId, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->status, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams, code) + SDB_GET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage, code) + SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState, code) - cJSON *maxUsers = cJSON_GetObjectItem(root, "maxUsers"); - if (!maxUsers || maxUsers->type != cJSON_String) { - mError("acct:%s, failed to parse since maxUsers not found", pAcct->acct); - goto DECODE_ACCT_OVER; + if (code != 0) { + tfree(pAcct); + terrno = code; + return NULL; } - pAcct->cfg.maxUsers = atol(maxUsers->valuestring); - cJSON *maxDbs = cJSON_GetObjectItem(root, "maxDbs"); - if (!maxDbs || maxDbs->type != cJSON_String) { - mError("acct:%s, failed to parse since maxDbs not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->cfg.maxDbs = atol(maxDbs->valuestring); + return pAcct; +} - cJSON *maxTimeSeries = cJSON_GetObjectItem(root, "maxTimeSeries"); - if (!maxTimeSeries || maxTimeSeries->type != cJSON_String) { - mError("acct:%s, failed to parse since maxTimeSeries not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->cfg.maxTimeSeries = atol(maxTimeSeries->valuestring); +static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } - cJSON *maxStreams = cJSON_GetObjectItem(root, "maxStreams"); - if (!maxStreams || maxStreams->type != cJSON_String) { - mError("acct:%s, failed to parse since maxStreams not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->cfg.maxStreams = atol(maxStreams->valuestring); +static int32_t mnodeAcctActionDelete(SAcctObj *pAcct) { return 0; } - cJSON *maxStorage = cJSON_GetObjectItem(root, "maxStorage"); - if (!maxStorage || maxStorage->type != cJSON_String) { - mError("acct:%s, failed to parse since maxStorage not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->cfg.maxStorage = atoll(maxStorage->valuestring); +static int32_t mnodeAcctActionUpdate(SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { + memcpy(pDstAcct, pSrcAcct, (int32_t)((char *)&pDstAcct->info - (char *)&pDstAcct)); + return 0; +} - cJSON *accessState = cJSON_GetObjectItem(root, "accessState"); - if (!accessState || accessState->type != cJSON_String) { - mError("acct:%s, failed to parse since accessState not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->cfg.accessState = atol(accessState->valuestring); +static int32_t mnodeCreateDefaultAcct() { + int32_t code = 0; - cJSON *createdTime = cJSON_GetObjectItem(root, "createdTime"); - if (!createdTime || createdTime->type != cJSON_String) { - mError("acct:%s, failed to parse since createdTime not found", pAcct->acct); - goto DECODE_ACCT_OVER; - } - pAcct->createdTime = atol(createdTime->valuestring); + SAcctObj acctObj = {0}; + tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN); + acctObj.createdTime = taosGetTimestampMs(); + acctObj.updateTime = taosGetTimestampMs(); + acctObj.acctId = 1; + acctObj.cfg = (SAcctCfg){.maxUsers = 128, + .maxDbs = 128, + .maxTimeSeries = INT32_MAX, + .maxStreams = 1000, + .maxStorage = INT64_MAX, + .accessState = TSDB_VN_ALL_ACCCESS}; - cJSON *updateTime = cJSON_GetObjectItem(root, "updateTime"); - if (!updateTime || updateTime->type != cJSON_String) { - mError("acct:%s, failed to parse since updateTime not found", pAcct->acct); - goto DECODE_ACCT_OVER; + SSdbRawData *pRaw = mnodeAcctActionEncode(&acctObj); + if (pRaw != NULL) { + code = sdbWrite(pRaw); + } else { + code = terrno; } - pAcct->updateTime = atol(updateTime->valuestring); - - code = 0; - mTrace("acct:%s, parse success", pAcct->acct); -DECODE_ACCT_OVER: - if (code != 0) { - free(pAcct); - pAcct = NULL; - } - return pAcct; + return code; } int32_t mnodeInitAcct() { - sdbSetFp(MN_SDB_ACCT, MN_KEY_BINARY, mnodeCreateDefaultAcct, (SdbEncodeFp)mnodeEncodeAcct, - (SdbDecodeFp)(mnodeDecodeAcct), sizeof(SAcctObj)); + SSdbDesc desc = {.sdbType = SDB_ACCT, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)mnodeCreateDefaultAcct, + .encodeFp = (SdbEncodeFp)mnodeAcctActionEncode, + .decodeFp = (SdbDecodeFp)mnodeAcctActionDecode, + .insertFp = (SdbInsertFp)mnodeAcctActionInsert, + .updateFp = (SdbUpdateFp)mnodeAcctActionUpdate, + .deleteFp = (SdbDeleteFp)mnodeAcctActionDelete}; + sdbSetHandler(desc); return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index 4a4c0864225012516afe364c8dad92e079da271f..a77419db516cc0fbd5d5a5891da724fc5fe7a6fd 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -19,111 +19,142 @@ #include "tglobal.h" #include "mnodeInt.h" -static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { - int32_t code = TSDB_CODE_SUCCESS; +#define USER_VER 1 - SUserObj userObj = {0}; - tstrncpy(userObj.user, user, TSDB_USER_LEN); - tstrncpy(userObj.acct, acct, TSDB_USER_LEN); - taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); - userObj.createdTime = taosGetTimestampMs(); - userObj.updateTime = taosGetTimestampMs(); - - if (strcmp(user, TSDB_DEFAULT_USER) == 0) { - userObj.rootAuth = 1; +static SSdbRawData *mnodeUserActionEncode(SUserObj *pUser) { + SSdbRawData *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRawData)); + if (pRaw == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; } - sdbInsertRow(MN_SDB_USER, &userObj); + int32_t dataLen = 0; + char *pData = pRaw->data; + SDB_SET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN) + SDB_SET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN) + SDB_SET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_KEY_LEN) + SDB_SET_INT64_VAL(pData, dataLen, pUser->createdTime) + SDB_SET_INT64_VAL(pData, dataLen, pUser->updateTime) + SDB_SET_INT32_VAL(pData, dataLen, pUser->rootAuth) + + pRaw->dataLen = dataLen; + pRaw->type = SDB_USER; + pRaw->sver = USER_VER; + return pRaw; } -static void mnodeCreateDefaultUsers() { - mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS); - mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass); - mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass); -} +static SUserObj *mnodeUserActionDecode(SSdbRawData *pRaw) { + if (pRaw->sver != USER_VER) { + terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; + return NULL; + } -int32_t mnodeEncodeUser(SUserObj *pUser, char *buf, int32_t maxLen) { - int32_t len = 0; - char *base64 = base64_encode((const unsigned char *)pUser->pass, TSDB_KEY_LEN); + SUserObj *pUser = calloc(1, sizeof(SUserObj)); + if (pUser == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } - len += snprintf(buf + len, maxLen - len, "{\"type\":%d, ", MN_SDB_USER); - len += snprintf(buf + len, maxLen - len, "\"user\":\"%s\", ", pUser->user); - len += snprintf(buf + len, maxLen - len, "\"auth\":\"%24s\", ", base64); - len += snprintf(buf + len, maxLen - len, "\"acct\":\"%s\", ", pUser->acct); - len += snprintf(buf + len, maxLen - len, "\"createdTime\":\"%" PRIu64 "\", ", pUser->createdTime); - len += snprintf(buf + len, maxLen - len, "\"updateTime\":\"%" PRIu64 "\"}\n", pUser->updateTime); + int32_t code = 0; + int32_t dataLen = pRaw->dataLen; + char *pData = pRaw->data; + SDB_GET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN, code) + SDB_GET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN, code) + SDB_GET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_USER_LEN, code) + SDB_GET_INT64_VAL(pData, dataLen, pUser->createdTime, code) + SDB_GET_INT64_VAL(pData, dataLen, pUser->updateTime, code) + SDB_GET_INT32_VAL(pData, dataLen, pUser->rootAuth, code) - free(base64); - return len; -} + if (code != 0) { + tfree(pUser); + terrno = code; + return NULL; + } -SUserObj *mnodeDecodeUser(cJSON *root) { - int32_t code = -1; - SUserObj *pUser = calloc(1, sizeof(SUserObj)); + return pUser; +} - cJSON *user = cJSON_GetObjectItem(root, "user"); - if (!user || user->type != cJSON_String) { - mError("failed to parse user since user not found"); - goto DECODE_USER_OVER; +static int32_t mnodeUserActionInsert(SUserObj *pUser) { + pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pUser->prohibitDbHash == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; } - tstrncpy(pUser->user, user->valuestring, TSDB_USER_LEN); - if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) { - pUser->rootAuth = 1; + pUser->pAcct = sdbAcquire(SDB_ACCT, pUser->acct); + if (pUser->pAcct == NULL) { + return TSDB_CODE_MND_ACCT_NOT_EXIST; } - cJSON *pass = cJSON_GetObjectItem(root, "auth"); - if (!pass || pass->type != cJSON_String) { - mError("user:%s, failed to parse since auth not found", pUser->user); - goto DECODE_USER_OVER; - } + return 0; +} - int32_t outlen = 0; - char *base64 = (char *)base64_decode(pass->valuestring, strlen(pass->valuestring), &outlen); - if (outlen != TSDB_KEY_LEN) { - mError("user:%s, failed to parse since invalid auth format", pUser->user); - free(base64); - goto DECODE_USER_OVER; - } else { - memcpy(pUser->pass, base64, outlen); - free(base64); +static int32_t mnodeUserActionDelete(SUserObj *pUser) { + if (pUser->prohibitDbHash) { + taosHashCleanup(pUser->prohibitDbHash); + pUser->prohibitDbHash = NULL; } - cJSON *acct = cJSON_GetObjectItem(root, "acct"); - if (!acct || acct->type != cJSON_String) { - mError("user:%s, failed to parse since acct not found", pUser->user); - goto DECODE_USER_OVER; + if (pUser->acct != NULL) { + sdbRelease(SDB_ACCT, pUser->pAcct); + pUser->pAcct = NULL; } - tstrncpy(pUser->acct, acct->valuestring, TSDB_USER_LEN); - cJSON *createdTime = cJSON_GetObjectItem(root, "createdTime"); - if (!createdTime || createdTime->type != cJSON_String) { - mError("user:%s, failed to parse since createdTime not found", pUser->user); - goto DECODE_USER_OVER; + return 0; +} + +static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { + memcpy(pDstUser, pSrcUser, (int32_t)((char *)&pDstUser->prohibitDbHash - (char *)&pDstUser)); + return 0; +} + +static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { + int32_t code = 0; + + SUserObj userObj = {0}; + tstrncpy(userObj.user, user, TSDB_USER_LEN); + tstrncpy(userObj.acct, acct, TSDB_USER_LEN); + taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); + userObj.createdTime = taosGetTimestampMs(); + userObj.updateTime = taosGetTimestampMs(); + + if (strcmp(user, TSDB_DEFAULT_USER) == 0) { + userObj.rootAuth = 1; } - pUser->createdTime = atol(createdTime->valuestring); - cJSON *updateTime = cJSON_GetObjectItem(root, "updateTime"); - if (!updateTime || updateTime->type != cJSON_String) { - mError("user:%s, failed to parse since updateTime not found", pUser->user); - goto DECODE_USER_OVER; + SSdbRawData *pRaw = mnodeUserActionEncode(&userObj); + if (pRaw != NULL) { + code = sdbWrite(pRaw); + } else { + code = terrno; } - pUser->updateTime = atol(updateTime->valuestring); - code = 0; - mTrace("user:%s, parse success", pUser->user); + return code; +} -DECODE_USER_OVER: - if (code != 0) { - free(pUser); - pUser = NULL; - } - return pUser; +static int32_t mnodeCreateDefaultUsers() { + int32_t code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS); + if (code != 0) return code; + + code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass); + if (code != 0) return code; + + code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass); + if (code != 0) return code; + + return code; } int32_t mnodeInitUser() { - sdbSetFp(MN_SDB_USER, MN_KEY_BINARY, mnodeCreateDefaultUsers, (SdbEncodeFp)mnodeEncodeUser, - (SdbDecodeFp)(mnodeDecodeUser), sizeof(SUserObj)); + SSdbDesc desc = {.sdbType = SDB_USER, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)mnodeCreateDefaultUsers, + .encodeFp = (SdbEncodeFp)mnodeUserActionEncode, + .decodeFp = (SdbDecodeFp)mnodeUserActionDecode, + .insertFp = (SdbInsertFp)mnodeUserActionInsert, + .updateFp = (SdbUpdateFp)mnodeUserActionUpdate, + .deleteFp = (SdbDeleteFp)mnodeUserActionDelete}; + sdbSetHandler(desc); + return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeWorker.c b/source/dnode/mnode/impl/src/mnodeWorker.c index 4b0676f756a70a8f66aa76ce131d86c819f6ebad..cf2f415b4df10a5b142843caab65fa21a74120df 100644 --- a/source/dnode/mnode/impl/src/mnodeWorker.c +++ b/source/dnode/mnode/impl/src/mnodeWorker.c @@ -50,7 +50,7 @@ static SMnMsg *mnodeInitMsg2(SRpcMsg *pRpcMsg) { SRpcConnInfo connInfo = {0}; if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) == 0) { - pMsg->pUser = sdbGetRow(MN_SDB_USER, connInfo.user); + pMsg->pUser = sdbAcquire(SDB_USER, connInfo.user); } if (pMsg->pUser == NULL) { @@ -77,7 +77,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) { } else { SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); @@ -103,7 +103,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) { } else { SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); @@ -120,7 +120,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) { } else { SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle, @@ -135,7 +135,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) { void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) { SMnMsg *pMsg = mnodeInitMsg2(pRpcMsg); if (pMsg == NULL) { - SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_INVALID_USER}; + SRpcMsg rpcRsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_MND_USER_NOT_EXIST}; rpcSendResponse(&rpcRsp); } else { mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle, diff --git a/source/dnode/mnode/impl/src/mondeInt.c b/source/dnode/mnode/impl/src/mondeInt.c index d92c52e6c8438c735fee860b0940188ca7b60375..a7c76360e278ba1767f9ff15e27324b6506408f9 100644 --- a/source/dnode/mnode/impl/src/mondeInt.c +++ b/source/dnode/mnode/impl/src/mondeInt.c @@ -161,7 +161,7 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { if (code != 0) { mError("failed to deploy mnode since init step1 error"); tsMint.state = MN_STATUS_UNINIT; - return TSDB_CODE_MND_SDB_ERROR; + return TSDB_CODE_MND_APP_ERROR; } code = mnodeInitStep2(); @@ -169,7 +169,7 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { mnodeCleanupStep1(); mError("failed to deploy mnode since init step2 error"); tsMint.state = MN_STATUS_UNINIT; - return TSDB_CODE_MND_SDB_ERROR; + return TSDB_CODE_MND_APP_ERROR; } mDebug("mnode is deployed and waiting for raft to confirm"); diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 1eafb0518b0976a150c878783dc9deb85e618c05..3493e97ef229f506055f05d0cf9ac5c2503078b8 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -17,17 +17,17 @@ #define _TD_SDB_INT_H_ #include "os.h" +#include "sdb.h" #include "taosmsg.h" -#include "tlog.h" -#include "thash.h" #include "tglobal.h" -#include "sdb.h" +#include "thash.h" +#include "tlockfree.h" +#include "tlog.h" #ifdef __cplusplus extern "C" { #endif -// mnode log function #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }} @@ -35,6 +35,32 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} +#define SDB_MAX_SIZE (32*1024) + +typedef struct { + char *currDir; + char *syncDir; + char *tmpDir; + int64_t lastCommitVer; + int64_t curVer; + EKeyType keyTypes[SDB_MAX]; + SHashObj *hashObjs[SDB_MAX]; + SRWLatch locks[SDB_MAX]; + SdbInsertFp insertFps[SDB_MAX]; + SdbUpdateFp updateFps[SDB_MAX]; + SdbDeleteFp deleteFps[SDB_MAX]; + SdbDeployFp deployFps[SDB_MAX]; + SdbEncodeFp encodeFps[SDB_MAX]; + SdbDecodeFp decodeFps[SDB_MAX]; +} SSdbObj; + +typedef struct { + ESdbStatus status; + int32_t refCount; + int32_t dataLen; + char *data[]; +} SSdbRow; + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 145895798e4d7add23c1c53b4d7cb1df3439e3eb..001b95c5c4f9e13ef897a830dfbcdd3d30bc69ac 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -14,21 +14,9 @@ */ #define _DEFAULT_SOURCE -#include "cJSON.h" #include "sdbInt.h" -static struct { - char currDir[PATH_MAX]; - char backDir[PATH_MAX]; - char tmpDir[PATH_MAX]; - int64_t version; - EMnKey hashKey[MN_SDB_MAX]; - int32_t dataSize[MN_SDB_MAX]; - SHashObj *hashObj[MN_SDB_MAX]; - SdbDeployFp deployFp[MN_SDB_MAX]; - SdbEncodeFp encodeFp[MN_SDB_MAX]; - SdbDecodeFp decodeFp[MN_SDB_MAX]; -} tsSdb = {0}; +static SSdbObj tsSdb = {0}; static int32_t sdbCreateDir() { if (!taosMkDir(tsSdb.currDir)) { @@ -36,8 +24,8 @@ static int32_t sdbCreateDir() { return TAOS_SYSTEM_ERROR(errno); } - if (!taosMkDir(tsSdb.backDir)) { - mError("failed to create dir:%s", tsSdb.backDir); + if (!taosMkDir(tsSdb.syncDir)) { + mError("failed to create dir:%s", tsSdb.syncDir); return -1; } @@ -50,8 +38,8 @@ static int32_t sdbCreateDir() { } static int32_t sdbRunDeployFp() { - for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { - SdbDeployFp fp = tsSdb.deployFp[i]; + for (int32_t i = SDB_START; i < SDB_MAX; ++i) { + SdbDeployFp fp = tsSdb.deployFps[i]; if (fp) { (*fp)(); } @@ -60,152 +48,147 @@ static int32_t sdbRunDeployFp() { return 0; } -static int32_t sdbReadVersion(cJSON *root) { - cJSON *ver = cJSON_GetObjectItem(root, "version"); - if (!ver || ver->type != cJSON_String) { - mError("failed to parse version since version not found"); - return -1; +static SHashObj *sdbGetHash(int32_t sdb) { + if (sdb >= SDB_MAX || sdb <= SDB_START) { + return NULL; } - tsSdb.version = (int64_t)atoll(ver->valuestring); - mTrace("parse version success, version:%" PRIu64, tsSdb.version); + SHashObj *hash = tsSdb.hashObjs[sdb]; + if (hash == NULL) { + return NULL; + } - return 0; + return hash; } -static void sdbWriteVersion(FileFd fd) { - char content[128]; - int32_t len = - snprintf(content, sizeof(content), "{\"type\":0, \"version\":\"%" PRIu64 "\", \"updateTime\":\"%" PRIu64 "\"}\n", - tsSdb.version, taosGetTimestampMs()); - taosWriteFile(fd, content, len); -} +int32_t sdbWrite(SSdbRawData *pRaw) { + SHashObj *hash = sdbGetHash(pRaw->type); + switch (pRaw->action) { + case SDB_ACTION_INSERT: + break; + case SDB_ACTION_UPDATE: + break; + case SDB_ACTION_DELETE: + break; -static int32_t sdbReadDataFile() { - ssize_t _bytes = 0; - size_t len = 4096; - char *line = calloc(1, len); - int32_t code = -1; - FILE *fp = NULL; - cJSON *root = NULL; - - char file[PATH_MAX + 20]; - snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); - fp = fopen(file, "r"); - if (!fp) { - mDebug("failed to open file:%s for read since %s", file, strerror(errno)); - goto PARSE_SDB_DATA_ERROR; + default: + break; } - while (!feof(fp)) { - memset(line, 0, len); - _bytes = tgetline(&line, &len, fp); - if (_bytes < 0) { - break; - } + return 0; +} - line[len - 1] = 0; - if (len <= 10) continue; +static int32_t sdbWriteVersion(FileFd fd) { return 0; } - root = cJSON_Parse(line); - if (root == NULL) { - mError("failed to parse since invalid json format, %s", line); - goto PARSE_SDB_DATA_ERROR; - } +static int32_t sdbReadVersion(FileFd fd) { return 0; } - cJSON *type = cJSON_GetObjectItem(root, "type"); - if (!type || type->type != cJSON_Number) { - mError("failed to parse since invalid type not found, %s", line); - goto PARSE_SDB_DATA_ERROR; - } +static int32_t sdbReadDataFile() { + int32_t code = 0; - if (type->valueint >= MN_SDB_MAX || type->valueint < MN_SDB_START) { - mError("failed to parse since invalid type, %s", line); - goto PARSE_SDB_DATA_ERROR; + SSdbRawData *pRaw = malloc(SDB_MAX_SIZE); + if (pRaw == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + FileFd fd = taosOpenFileCreateWrite(file); + if (fd <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for read since %s", file, tstrerror(code)); + return code; + } + + int64_t offset = 0; + while (1) { + int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRawData)); + if (ret == 0) break; + + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; } - if (type->valueint == MN_SDB_START) { - if (sdbReadVersion(root) != 0) { - mError("failed to parse version, %s", line); - goto PARSE_SDB_DATA_ERROR; - } - cJSON_Delete(root); - root = NULL; - continue; + if (ret < sizeof(SSdbRawData)) { + code = TSDB_CODE_SDB_INTERNAL_ERROR; + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; } - SdbDecodeFp decodeFp = tsSdb.decodeFp[type->valueint]; - SdbHead *pHead = (*decodeFp)(root); - if (pHead == NULL) { - mError("failed to parse since decode error, %s", line); + code = sdbWrite(pRaw); + if (code != 0) { + mError("failed to read file:%s since %s", file, tstrerror(code)); goto PARSE_SDB_DATA_ERROR; } - - pHead->type = type->valueint; - pHead->status = MN_SDB_STAT_AVAIL; - - sdbInsertRow(pHead->type, pHead); - free(pHead); - cJSON_Delete(root); - root = NULL; } code = 0; PARSE_SDB_DATA_ERROR: - if (line) free(line); - if (fp) fclose(fp); - if (root) cJSON_Delete(root); - + taosCloseFile(fd); return code; } static int32_t sdbWriteDataFile() { - char file[PATH_MAX + 20] = {0}; - snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); - FileFd fd = taosOpenFileCreateWrite(file); + int32_t code = 0; + + char tmpfile[PATH_MAX] = {0}; + snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); + + FileFd fd = taosOpenFileCreateWrite(tmpfile); if (fd <= 0) { - mError("failed to open file:%s for write since %s", file, strerror(errno)); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); + return code; } - int32_t len; - int32_t maxLen = 10240; - char *buf = malloc(maxLen); - - for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { - SHashObj *hash = tsSdb.hashObj[i]; + for (int32_t i = SDB_START; i < SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObjs[i]; if (!hash) continue; - SdbEncodeFp encodeFp = tsSdb.encodeFp[i]; + SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; if (!encodeFp) continue; - SdbHead *pHead = taosHashIterate(hash, NULL); - while (pHead != NULL) { - len = (*encodeFp)(pHead, buf, maxLen); - if (len >= 0) { - taosWriteFile(fd, buf, len); + SSdbRow *pRow = taosHashIterate(hash, NULL); + while (pRow != NULL) { + if (pRow->status == SDB_STATUS_READY) continue; + SSdbRawData *pRaw = (*encodeFp)(pRow->data); + if (pRaw != NULL) { + taosWriteFile(fd, pRaw, sizeof(SSdbRawData) + pRaw->dataLen); + } else { + taosHashCancelIterate(hash, pRow); + code = TSDB_CODE_SDB_INTERNAL_ERROR; + break; } - pHead = taosHashIterate(hash, pHead); + pRow = taosHashIterate(hash, pRow); } } - sdbWriteVersion(fd); - taosFsyncFile(fd); + if (code == 0) { + code = sdbWriteVersion(fd); + } + taosCloseFile(fd); - mInfo("write file:%s successfully", file); - return 0; -} + if (code == 0) { + code = taosFsyncFile(fd); + } -int32_t sdbCommit() { - int32_t code = sdbWriteDataFile(); if (code != 0) { - return code; + char curfile[PATH_MAX] = {0}; + snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); + code = taosRenameFile(tmpfile, curfile); } - return 0; + if (code != 0) { + mError("failed to write sdb file since %s", tstrerror(code)); + } else { + mInfo("write sdb file successfully"); + } + + return code; } int32_t sdbRead() { @@ -218,6 +201,15 @@ int32_t sdbRead() { return -1; } +int32_t sdbCommit() { + int32_t code = sdbWriteDataFile(); + if (code != 0) { + return code; + } + + return 0; +} + int32_t sdbDeploy() { if (sdbCreateDir() != 0) { return -1; @@ -231,38 +223,32 @@ int32_t sdbDeploy() { return -1; } - // if (!taosMkDir()) - // if (pMinfos == NULL) { // first deploy - // tsMint.dnodeId = 1; - // bool getuid = taosGetSystemUid(tsMint.clusterId); - // if (!getuid) { - // strcpy(tsMint.clusterId, "tdengine3.0"); - // mError("deploy new mnode but failed to get uid, set to default val %s", tsMint.clusterId); - // } else { - // mDebug("deploy new mnode and uid is %s", tsMint.clusterId); - // } - // } else { // todo - // } - - // if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) { - // mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno)); - // return -1; - // } return 0; } void sdbUnDeploy() {} int32_t sdbInit() { - snprintf(tsSdb.currDir, PATH_MAX, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); - snprintf(tsSdb.backDir, PATH_MAX, "%s%sbackup%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); - snprintf(tsSdb.tmpDir, PATH_MAX, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + char path[PATH_MAX + 100]; + + snprintf(path, PATH_MAX + 100, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + tsSdb.currDir = strdup(path); - for (int32_t i = 0; i < MN_SDB_MAX; ++i) { + snprintf(path, PATH_MAX + 100, "%s%ssync%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + tsSdb.syncDir = strdup(path); + + snprintf(path, PATH_MAX + 100, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + tsSdb.tmpDir = strdup(path); + + if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < SDB_MAX; ++i) { int32_t type; - if (tsSdb.hashKey[i] == MN_KEY_INT32) { + if (tsSdb.keyTypes[i] == SDB_KEY_INT32) { type = TSDB_DATA_TYPE_INT; - } else if (tsSdb.hashKey[i] == MN_KEY_INT64) { + } else if (tsSdb.keyTypes[i] == SDB_KEY_INT64) { type = TSDB_DATA_TYPE_BIGINT; } else { type = TSDB_DATA_TYPE_BINARY; @@ -270,65 +256,72 @@ int32_t sdbInit() { SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); if (hash == NULL) { - return -1; + return TSDB_CODE_MND_OUT_OF_MEMORY; } - tsSdb.hashObj[i] = hash; + tsSdb.hashObjs[i] = hash; + taosInitRWLatch(&tsSdb.locks[i]); } return 0; } void sdbCleanup() { - for (int32_t i = 0; i < MN_SDB_MAX; ++i) { - SHashObj *hash = tsSdb.hashObj[i]; - if (hash != NULL) { - taosHashCleanup(hash); - } - tsSdb.hashObj[i] = NULL; + if (tsSdb.curVer != tsSdb.lastCommitVer) { + sdbCommit(); } -} -void sdbSetFp(EMnSdb sdb, EMnKey keyType, SdbDeployFp deployFp, SdbEncodeFp encodeFp, SdbDecodeFp decodeFp, - int32_t dataSize) { - tsSdb.deployFp[sdb] = deployFp; - tsSdb.encodeFp[sdb] = encodeFp; - tsSdb.decodeFp[sdb] = decodeFp; - tsSdb.dataSize[sdb] = dataSize; - tsSdb.hashKey[sdb] = keyType; -} + if (tsSdb.currDir != NULL) { + tfree(tsSdb.currDir); + } -static SHashObj *sdbGetHash(int32_t sdb) { - if (sdb >= MN_SDB_MAX || sdb <= MN_SDB_START) { - return NULL; + if (tsSdb.syncDir != NULL) { + tfree(tsSdb.syncDir); } - SHashObj *hash = tsSdb.hashObj[sdb]; - if (hash == NULL) { - return NULL; + if (tsSdb.tmpDir != NULL) { + tfree(tsSdb.tmpDir); } - return hash; + for (int32_t i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObjs[i]; + if (hash != NULL) { + taosHashCleanup(hash); + } + tsSdb.hashObjs[i] = NULL; + } } -void *sdbInsertRow(EMnSdb sdb, void *p) { +void sdbSetHandler(SSdbDesc desc) { + ESdbType sdb = desc.sdbType; + tsSdb.keyTypes[sdb] = desc.keyType; + tsSdb.insertFps[sdb] = desc.insertFp; + tsSdb.updateFps[sdb] = desc.updateFp; + tsSdb.deleteFps[sdb] = desc.deleteFp; + tsSdb.deployFps[sdb] = desc.deployFp; + tsSdb.encodeFps[sdb] = desc.encodeFp; + tsSdb.decodeFps[sdb] = desc.decodeFp; +} + +#if 0 +void *sdbInsertRow(ESdbType sdb, void *p) { SdbHead *pHead = p; pHead->type = sdb; - pHead->status = MN_SDB_STAT_AVAIL; + pHead->status = SDB_AVAIL; - char *pKey = (char *)pHead + sizeof(pHead); - int32_t keySize; - EMnKey keyType = tsSdb.hashKey[pHead->type]; - int32_t dataSize = tsSdb.dataSize[pHead->type]; + char *pKey = (char *)pHead + sizeof(pHead); + int32_t keySize; + EKeyType keyType = tsSdb.keyTypes[pHead->type]; + int32_t dataSize = tsSdb.dataSize[pHead->type]; SHashObj *hash = sdbGetHash(pHead->type); if (hash == NULL) { return NULL; } - if (keyType == MN_KEY_INT32) { + if (keyType == SDBINT32) { keySize = sizeof(int32_t); - } else if (keyType == MN_KEY_BINARY) { + } else if (keyType == SDB_KEY_BINARY) { keySize = strlen(pKey) + 1; } else { keySize = sizeof(int64_t); @@ -338,34 +331,58 @@ void *sdbInsertRow(EMnSdb sdb, void *p) { return taosHashGet(hash, pKey, keySize); } -void sdbDeleteRow(EMnSdb sdb, void *p) { +void sdbDeleteRow(ESdbType sdb, void *p) { SdbHead *pHead = p; - pHead->status = MN_SDB_STAT_DROPPED; + pHead->status = SDB_STATUS_DROPPED; } -void *sdbUpdateRow(EMnSdb sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } +void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } + +#endif + +void *sdbAcquire(ESdbType sdb, void *pKey) { + terrno = 0; -void *sdbGetRow(EMnSdb sdb, void *pKey) { SHashObj *hash = sdbGetHash(sdb); if (hash == NULL) { return NULL; } - int32_t keySize; - EMnKey keyType = tsSdb.hashKey[sdb]; + int32_t keySize; + EKeyType keyType = tsSdb.keyTypes[sdb]; - if (keyType == MN_KEY_INT32) { - keySize = sizeof(int32_t); - } else if (keyType == MN_KEY_BINARY) { - keySize = strlen(pKey) + 1; + switch (keyType) { + case SDB_KEY_INT32: + keySize = sizeof(int32_t); + break; + case SDB_KEY_INT64: + keySize = sizeof(int64_t); + break; + case SDB_KEY_BINARY: + keySize = strlen(pKey) + 1; + break; + default: + keySize = sizeof(int32_t); + } + + SSdbRow *pRow = taosHashGet(hash, pKey, keySize); + if (pRow == NULL) return NULL; + + if (pRow->status == SDB_STATUS_READY) { + atomic_add_fetch_32(&pRow->refCount, 1); + return pRow->data; } else { - keySize = sizeof(int64_t); + terrno = -1; // todo + return NULL; } +} - return taosHashGet(hash, pKey, keySize); +void sdbRelease(ESdbType sdb, void *pObj) { + SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); + atomic_sub_fetch_32(&pRow->refCount, 1); } -void *sdbFetchRow(EMnSdb sdb, void *pIter) { +void *sdbFetchRow(ESdbType sdb, void *pIter) { SHashObj *hash = sdbGetHash(sdb); if (hash == NULL) { return NULL; @@ -374,16 +391,15 @@ void *sdbFetchRow(EMnSdb sdb, void *pIter) { return taosHashIterate(hash, pIter); } -void sdbCancelFetch(EMnSdb sdb, void *pIter) { +void sdbCancelFetch(ESdbType sdb, void *pIter) { SHashObj *hash = sdbGetHash(sdb); if (hash == NULL) { return; } - taosHashCancelIterate(hash, pIter); } -int32_t sdbGetCount(EMnSdb sdb) { +int32_t sdbGetSize(ESdbType sdb) { SHashObj *hash = sdbGetHash(sdb); if (hash == NULL) { return 0; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index fa49c20c7748f50ce944165e1e9e2c5d9a33da49..1aafc880bef7679061d879300258c4f7e6efa120 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1582,7 +1582,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { code = htonl(pHead->code); if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || - code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { + code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); return 0; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 324f78ad793b2aa9f92b4a4dd309960c71c9180d..2003270b2168d7f023f3718e1d8333a6b17d7994 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -142,12 +142,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_START_SYNC, "failed to start sync" TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_CREATE_DIR, "failed to create mnode dir") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_INIT_STEP, "failed to init components") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, "Object already there") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, "Unexpected generic error in sdb") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE, "Invalid table type") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_NOT_THERE, "Object not there") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_META_ROW, "Invalid meta row") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_KEY_TYPE, "Invalid key type") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INTERNAL_ERROR, "Unexpected generic error in sdb") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OUT_OF_MEMORY, "Out of memory in sdb") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_NOT_THERE, "Object not there") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVAID_RAW_DATA_VER, "Invalid raw data version") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVAID_RAW_DATA_LEN, "Invalid raw data len") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_TABLE_TYPE, "Invalid table type") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVAID_META_ROW, "Invalid meta row") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVAID_KEY_TYPE, "Invalid key type") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist") @@ -166,12 +169,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, "Dnode Id not configur TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, "Dnode Ep not configured") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, "Invalid account") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_NOT_EXIST, "Invalid account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, "Invalid account options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_EXPIRED, "Account authorization has expired") TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_ALREADY_EXIST, "User already exists") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER, "Invalid user") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_NOT_EXIST, "Invalid user") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER_FORMAT, "Invalid user format") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_PASS_FORMAT, "Invalid password format") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_USER_FROM_CONN, "Can not get user from conn")