提交 e9a07987 编写于 作者: S slguan

[TD-15] refactor sdb

上级 4b7e463b
......@@ -23,6 +23,9 @@ extern "C" {
int32_t mgmtInitMnodes();
void mgmtCleanupMnodes();
bool mgmtInServerStatus();
bool mgmtIsMaster();
bool mgmtCheckRedirect(void *handle);
void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet);
void mgmtGetMnodePublicIpList(SRpcIpSet *ipSet);
......
......@@ -20,27 +20,11 @@
extern "C" {
#endif
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "hashint.h"
#include "hashstr.h"
#include "tchecksum.h"
#include "tlog.h"
#include "trpc.h"
#include "tutil.h"
enum _keytype {
SDB_KEYTYPE_STRING,
SDB_KEYTYPE_AUTO,
SDB_KEYTYPE_MAX
};
} ESdbKeyType;
enum _sdbaction {
SDB_TYPE_INSERT,
......@@ -50,11 +34,15 @@ enum _sdbaction {
SDB_TYPE_ENCODE,
SDB_TYPE_DESTROY,
SDB_MAX_ACTION_TYPES
};
} ESdbType;
typedef enum {
SDB_OPER_GLOBAL,
SDB_OPER_LOCAL,
SDB_OPER_DISK
} ESdbOper;
uint64_t sdbGetVersion();
bool sdbInServerState();
bool sdbIsMaster();
void *sdbOpenTable(int32_t maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *(*appTool)(char, void *, char *, int32_t, int32_t *));
......@@ -65,9 +53,9 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
int64_t sdbGetId(void *handle);
int64_t sdbGetNumOfRows(void *handle);
int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize);
int32_t sdbDeleteRow(void *handle, void *key);
int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated);
int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper);
int32_t sdbDeleteRow(void *handle, void *key, ESdbOper oper);
int32_t sdbUpdateRow(void *handle, void *row, int32_t rowSize, ESdbOper oper);
#ifdef __cplusplus
}
......
......@@ -29,6 +29,7 @@
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
......@@ -94,7 +95,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
return NULL;
}
if (!sdbIsMaster()) {
if (!mgmtIsMaster()) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("ctable:%s, sid:%d is not matched from the master:%d", pTable->tableId, sid, pTable->sid);
......@@ -214,7 +215,7 @@ int32_t mgmtInitChildTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("ctable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -223,7 +224,7 @@ int32_t mgmtInitChildTables() {
if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -232,7 +233,7 @@ int32_t mgmtInitChildTables() {
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -240,7 +241,7 @@ int32_t mgmtInitChildTables() {
if (pVgroup->tableList == NULL) {
mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -252,7 +253,7 @@ int32_t mgmtInitChildTables() {
if (pSuperTable == NULL) {
mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -337,7 +338,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
pTable->vgId = pVgroup->vgId;
pTable->superTable = pSuperTable;
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
if (sdbInsertRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
free(pTable);
mError("ctable:%s, update sdb error", pCreate->tableId);
terrno = TSDB_CODE_SDB_ERROR;
......@@ -481,7 +482,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_LOCAL);
pNode = pLastNode;
numOfTables ++;
continue;
......
......@@ -279,7 +279,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
pDb->createdTime = taosGetTimestampMs();
pDb->cfg = *pCreate;
if (sdbInsertRow(tsDbSdb, pDb, 0) < 0) {
if (sdbInsertRow(tsDbSdb, pDb, SDB_OPER_GLOBAL) < 0) {
code = TSDB_CODE_SDB_ERROR;
tfree(pDb);
}
......@@ -819,7 +819,7 @@ static void mgmtDropDb(void *handle, void *tmrId) {
SDbObj *pDb = newMsg->ahandle;
mPrint("db:%s, drop db from sdb", pDb->name);
int32_t code = sdbDeleteRow(tsDbSdb, pDb);
int32_t code = sdbDeleteRow(tsDbSdb, pDb, SDB_OPER_GLOBAL);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
......
......@@ -127,7 +127,7 @@ int32_t mgmtStartSystem() {
void mgmtStopSystem() {
if (sdbIsMaster()) {
if (mgmtIsMaster()) {
mTrace("it is a master mgmt node, it could not be stopped");
return;
}
......
......@@ -29,6 +29,8 @@ int32_t (*mpeerInitMnodesFp)() = NULL;
void (*mpeerCleanUpMnodesFp)() = NULL;
static SMnodeObj tsMnodeObj = {0};
static bool tsMnodeIsMaster = false;
static bool tsMnodeIsServing = false;
static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -52,6 +54,8 @@ int32_t mgmtInitMnodes() {
if (mpeerInitMnodesFp) {
return (*mpeerInitMnodesFp)();
} else {
tsMnodeIsServing = true;
tsMnodeIsMaster = true;
return 0;
}
}
......@@ -62,6 +66,14 @@ void mgmtCleanupMnodes() {
}
}
bool mgmtInServerStatus() {
return tsMnodeIsServing;
}
bool mgmtIsMaster() {
return tsMnodeIsMaster;
}
bool mgmtCheckRedirect(void *handle) {
return false;
}
......
......@@ -26,6 +26,7 @@
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtSdb.h"
#include "mgmtSuperTable.h"
......@@ -98,7 +99,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s
return NULL;
}
if (!sdbIsMaster()) {
if (!mgmtIsMaster()) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
......@@ -237,7 +238,7 @@ int32_t mgmtInitNormalTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("ntable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -246,7 +247,7 @@ int32_t mgmtInitNormalTables() {
if (pVgroup == NULL) {
mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -255,7 +256,7 @@ int32_t mgmtInitNormalTables() {
mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -263,7 +264,7 @@ int32_t mgmtInitNormalTables() {
if (pVgroup->tableList == NULL) {
mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -370,7 +371,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t
mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql);
}
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
if (sdbInsertRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
mError("table:%s, update sdb error", pTable->tableId);
free(pTable);
terrno = TSDB_CODE_SDB_ERROR;
......@@ -467,7 +468,7 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int3
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1);
sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -496,7 +497,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName)
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1);
sdbUpdateRow(tsNormalTableSdb, pTable, tsNormalTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -555,7 +556,7 @@ void mgmtDropAllNormalTables(SDbObj *pDropDb) {
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_LOCAL);
pNode = pLastNode;
numOfTables ++;
continue;
......
......@@ -17,15 +17,18 @@
#include "os.h"
#include "taosdef.h"
#include "tutil.h"
#include "tchecksum.h"
#include "tlog.h"
#include "trpc.h"
#include "tutil.h"
#include "hashint.h"
#include "hashstr.h"
#include "mgmtSdb.h"
#define abs(x) (((x) < 0) ? -(x) : (x))
#define SDB_MAX_PEERS 4
#define SDB_DELIMITER 0xFFF00F00
#define SDB_ENDCOMMIT 0xAFFFAAAF
#define SDB_STATUS_OFFLINE 0
#define SDB_STATUS_SERVING 1
typedef struct {
uint64_t swVersion;
......@@ -81,15 +84,6 @@ int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32
static SSdbTable *sdbTableList[10] = {0};
static int32_t sdbNumOfTables = 0;
static uint64_t sdbVersion = 0;
static int32_t sdbMaster = 0;
static int32_t sdbStatus = SDB_STATUS_OFFLINE;
// #ifdef CLUSTER
// int32_t sdbMaster = 0;
// #else
// int32_t sdbMaster = 1;
// #endif
static void *(*sdbInitIndexFp[])(int32_t maxRows, int32_t dataSize) = {sdbOpenStrHash, sdbOpenIntHash};
static void *(*sdbAddIndexFp[])(void *handle, void *key, void *data) = {sdbAddStrHash, sdbAddIntHash};
......@@ -102,8 +96,6 @@ void sdbResetTable(SSdbTable *pTable);
void sdbSaveSnapShot(void *handle);
uint64_t sdbGetVersion() { return sdbVersion; }
bool sdbInServerState() { return sdbStatus == SDB_STATUS_SERVING; }
bool sdbIsMaster() { return sdbMaster; }
int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->id; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
......@@ -319,6 +311,10 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
pTable->size += real_size;
if (pTable->id < abs(rowHead->id)) pTable->id = abs(rowHead->id);
//TODO: check this valid
pTable->size += 4;
lseek(pTable->fd, 4, SEEK_CUR);
}
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
......@@ -390,81 +386,67 @@ void *sdbGetRow(void *handle, void *key) {
}
// row here must be encoded string (rowSize > 0) or the object it self (rowSize = 0)
int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) {
int32_t sdbInsertRow(void *handle, void *row, ESdbOper oper) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta rowMeta;
int64_t id = -1;
void * pObj = NULL;
int32_t total_size = 0;
int32_t real_size = 0;
/* char action = SDB_TYPE_INSERT; */
int32_t total_size = 0;
int32_t real_size = 0;
if (pTable == NULL) {
sdbError("sdb tables is null");
return -1;
}
if ((pTable->keyType != SDB_KEYTYPE_AUTO) || *((int64_t *)row))
if (sdbGetRow(handle, row)) {
if (strcmp(pTable->name, "mnode") == 0) {
/*
* The first mnode created when the system just start, so the insert action may failed
* see sdbPeer.c : sdbInitPeers
*/
pTable->id++;
sdbVersion++;
sdbPrint("table:%s, record:%s already exist, think it successed, sdbVersion:%" PRId64 " id:%" PRId64,
pTable->name, taosIpStr(*(int32_t *)row), sdbVersion, pTable->id);
return 0;
} else {
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbError("table:%s, failed to insert record:%s sdbVersion:%" PRId64 " id:%" PRId64 , pTable->name, (char *)row, sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_AUTO:
sdbError("table:%s, failed to insert record:%d sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, *(int32_t *)row, sdbVersion, pTable->id);
break;
default:
sdbError("table:%s, failed to insert record sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id);
break;
}
return -1;
}
if (sdbGetRow(handle, row)) {
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbError("table:%s, failed to insert record:%s sdbVersion:%" PRId64 " id:%" PRId64 , pTable->name, (char *)row, sdbVersion, pTable->id);
break;
case SDB_KEYTYPE_AUTO:
sdbError("table:%s, failed to insert record:%d sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, *(int32_t *)row, sdbVersion, pTable->id);
break;
default:
sdbError("table:%s, failed to insert record sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id);
break;
}
return -1;
}
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name);
sdbError("table:%s, failed to allocate row head memory", pTable->name);
return -1;
}
memset(rowHead, 0, total_size);
if (rowSize == 0) { // object is created already
if (oper == SDB_OPER_GLOBAL) {
pObj = row;
} else { // encoded string, to create object
pObj = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, row, rowSize, NULL);
}
(*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize));
assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize);
} else {
pObj = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, row, 0, NULL);
}
pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) {
pTable->id++;
sdbVersion++;
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
// TODO:here need to change
*((uint32_t *)pObj) = ++pTable->autoIndex;
(*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize));
}
if (oper == SDB_OPER_GLOBAL) {
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) != 0) {
sdbError("table:%s, failed to insert record", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (oper == SDB_OPER_GLOBAL || oper == SDB_OPER_LOCAL) {
(*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize));
assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize);
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
rowHead->id = pTable->id + 1;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
sdbError("table:%s, failed to get checksum while inserting", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
......@@ -477,14 +459,10 @@ int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) {
rowMeta.row = pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pObj, &rowMeta);
/* Update the disk content */
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
twrite(pTable->fd, rowHead, real_size);
pTable->size += real_size;
sdbFinishCommit(pTable);
pTable->numOfRows++;
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64,
......@@ -499,32 +477,33 @@ int64_t sdbInsertRow(void *handle, void *row, int32_t rowSize) {
pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size);
break;
}
}
id = rowMeta.id;
} else {
sdbError("table:%s, failed to insert record", pTable->name);
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
*((uint32_t *)pObj) = ++pTable->autoIndex;
}
tfree(rowHead);
pTable->numOfRows++;
pTable->id++;
sdbVersion++;
pthread_mutex_unlock(&pTable->mutex);
/* callback function to update the MGMT layer */
if (id >= 0 && pTable->appTool) (*pTable->appTool)(SDB_TYPE_INSERT, pObj, NULL, 0, NULL);
(*pTable->appTool)(SDB_TYPE_INSERT, pObj, NULL, 0, NULL);
tfree(rowHead);
return id;
return 0;
}
// row here can be object or null-terminated string
int32_t sdbDeleteRow(void *handle, void *row) {
int32_t sdbDeleteRow(void *handle, void *row, ESdbOper oper) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta = NULL;
int32_t code = -1;
void * pMetaRow = NULL;
SRowHead * rowHead = NULL;
int32_t rowSize = 0;
int32_t total_size = 0;
/* char action = SDB_TYPE_DELETE; */
int32_t rowSize = 0;
int32_t total_size = 0;
if (pTable == NULL) return -1;
......@@ -558,67 +537,67 @@ int32_t sdbDeleteRow(void *handle, void *row) {
pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) {
pTable->id++;
sdbVersion++;
rowHead->delimiter = SDB_DELIMITER;
rowHead->rowSize = rowSize;
rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
if (oper == SDB_OPER_GLOBAL) {
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) {
sdbError("table:%s, failed to delete record", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
twrite(pTable->fd, rowHead, total_size);
pTable->size += total_size;
sdbFinishCommit(pTable);
}
}
pTable->numOfRows--;
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
default:
sdbTrace("table:%s, a record is deleted, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, sdbVersion, pTable->id, pTable->numOfRows);
break;
}
rowHead->delimiter = SDB_DELIMITER;
rowHead->rowSize = rowSize;
rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
// Delete from current layer
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row);
twrite(pTable->fd, rowHead, total_size);
pTable->size += total_size;
sdbFinishCommit(pTable);
code = 0;
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
default:
sdbTrace("table:%s, a record is deleted, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d",
pTable->name, sdbVersion, pTable->id, pTable->numOfRows);
break;
}
// Delete from current layer
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row);
pTable->numOfRows--;
pTable->id++;
sdbVersion++;
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
// callback function of the delete
if (code == 0 && pTable->appTool) (*pTable->appTool)(SDB_TYPE_DELETE, pMetaRow, NULL, 0, NULL);
(*pTable->appTool)(SDB_TYPE_DELETE, pMetaRow, NULL, 0, NULL);
return code;
return 0;
}
// row here can be the object or the string info (encoded string)
int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated) {
int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOper oper) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta = NULL;
int32_t code = -1;
int32_t total_size = 0;
int32_t real_size = 0;
/* char action = SDB_TYPE_UPDATE; */
int32_t total_size = 0;
int32_t real_size = 0;
if (pTable == NULL || row == NULL) return -1;
pMeta = sdbGetRowMeta(handle, row);
......@@ -651,8 +630,15 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated
}
memset(rowHead, 0, total_size);
if (!isUpdated) {
(*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, row, updateSize, NULL); // update in upper layer
pthread_mutex_lock(&pTable->mutex);
if (oper == SDB_OPER_GLOBAL) {
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) {
sdbError("table:%s, failed to update record", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
}
if (pMetaRow != row) {
......@@ -663,57 +649,51 @@ int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, char isUpdated
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
;
pthread_mutex_lock(&pTable->mutex);
if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) {
pTable->id++;
sdbVersion++;
// write to the new position
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
/* write(pTable->fd, &action, sizeof(action)); */
/* pTable->size += sizeof(action); */
twrite(pTable->fd, rowHead, real_size);
// write to the new position
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
}
twrite(pTable->fd, rowHead, real_size);
pMeta->id = pTable->id;
pMeta->offset = pTable->size;
pMeta->rowSize = rowHead->rowSize;
pTable->size += real_size;
pMeta->id = pTable->id;
pMeta->offset = pTable->size;
pMeta->rowSize = rowHead->rowSize;
pTable->size += real_size;
sdbFinishCommit(pTable);
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is updated:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is updated:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
default:
sdbTrace("table:%s, a record is updated, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, pTable->name, sdbVersion,
pTable->id, pTable->numOfRows);
break;
}
sdbFinishCommit(pTable);
code = 0;
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
sdbTrace("table:%s, a record is updated:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
case SDB_KEYTYPE_AUTO:
sdbTrace("table:%s, a record is updated:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64,
pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows);
break;
default:
sdbTrace("table:%s, a record is updated, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, pTable->name, sdbVersion,
pTable->id, pTable->numOfRows);
break;
}
pTable->id++;
sdbVersion++;
pthread_mutex_unlock(&pTable->mutex);
(*(pTable->appTool))(SDB_TYPE_UPDATE, pMetaRow, row, updateSize, NULL); // update in upper layer
tfree(rowHead);
return code;
return 0;
}
void sdbCloseTable(void *handle) {
......
......@@ -132,7 +132,7 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
}
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (!sdbInServerState()) {
if (!mgmtInServerStatus()) {
mgmtProcessMsgWhileNotReady(rpcMsg);
rpcFreeCont(rpcMsg->pCont);
return;
......
......@@ -181,7 +181,7 @@ int32_t mgmtInitSuperTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("super table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsSuperTableSdb, pTable);
sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_DISK);
pNode = pLastNode;
continue;
}
......@@ -233,7 +233,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
tschema[col].bytes = htons(tschema[col].bytes);
}
if (sdbInsertRow(tsSuperTableSdb, pStable, 0) < 0) {
if (sdbInsertRow(tsSuperTableSdb, pStable, SDB_OPER_GLOBAL) < 0) {
mError("stable:%s, update sdb error", pStable->tableId);
return TSDB_CODE_SDB_ERROR;
}
......@@ -319,7 +319,7 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t
pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId);
return TSDB_CODE_SUCCESS;
......@@ -352,7 +352,7 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -384,7 +384,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
mgmtSuperTableActionEncode(pStable, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
tfree(msg);
if (ret < 0) {
......@@ -446,7 +446,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32
pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -479,7 +479,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
pStable->schema = realloc(pStable->schema, schemaSize);
pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
return TSDB_CODE_SUCCESS;
}
......@@ -618,7 +618,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
}
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
sdbDeleteRow(tsSuperTableSdb, pTable);
sdbDeleteRow(tsSuperTableSdb, pTable, SDB_OPER_GLOBAL);
pNode = pLastNode;
numOfTables ++;
continue;
......
......@@ -757,9 +757,9 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
if (pTable->type == TSDB_CHILD_TABLE) {
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL);
} else if (pTable->type == TSDB_NORMAL_TABLE){
sdbDeleteRow(tsNormalTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL);
} else {}
mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code));
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
......@@ -813,14 +813,14 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
}
if (pTable->type == TSDB_CHILD_TABLE) {
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
if (sdbDeleteRow(tsChildTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
mError("table:%s, update ctables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
return;
}
} else if (pTable->type == TSDB_NORMAL_TABLE){
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
if (sdbDeleteRow(tsNormalTableSdb, pTable, SDB_OPER_GLOBAL) < 0) {
mError("table:%s, update ntables sdb error", pTable->tableId);
mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR);
free(queueMsg);
......
......@@ -103,7 +103,7 @@ SUserObj *mgmtGetUser(char *name) {
}
static int32_t mgmtUpdateUser(SUserObj *pUser) {
return sdbUpdateRow(tsUserSdb, pUser, 0, 1);
return sdbUpdateRow(tsUserSdb, pUser, tsUserUpdateSize, SDB_OPER_GLOBAL);
}
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
......@@ -140,7 +140,7 @@ static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
}
code = TSDB_CODE_SUCCESS;
if (sdbInsertRow(tsUserSdb, pUser, 0) < 0) {
if (sdbInsertRow(tsUserSdb, pUser, SDB_OPER_GLOBAL) < 0) {
tfree(pUser);
code = TSDB_CODE_SDB_ERROR;
}
......@@ -161,7 +161,7 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
return TSDB_CODE_NO_RIGHTS;
}
sdbDeleteRow(tsUserSdb, pUser);
sdbDeleteRow(tsUserSdb, pUser, SDB_OPER_GLOBAL);
return 0;
}
......
......@@ -112,7 +112,7 @@ int32_t mgmtInitVgroups() {
if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) {
pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp);
pVgroup->vnodeGid[0].privateIp = inet_addr(tsPrivateIp);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL);
}
// mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
......@@ -161,7 +161,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
mgmtAddVgroupIntoDb(pDb, pVgroup);
// mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
sdbInsertRow(tsVgroupSdb, pVgroup, 0);
sdbInsertRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
......@@ -179,7 +179,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
} else {
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
}
}
......@@ -474,7 +474,7 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t
}
void mgmtUpdateVgroup(SVgObj *pVgroup) {
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_LOCAL);
}
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
......@@ -607,7 +607,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mgmtAddToShellQueue(newMsg);
} else {
sdbDeleteRow(tsVgroupSdb, pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
}
......@@ -661,7 +661,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if (queueMsg->received != queueMsg->expected) return;
sdbDeleteRow(tsVgroupSdb, pVgroup);
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
......@@ -691,7 +691,7 @@ void mgmtUpdateVgroupIp(SDnodeObj *pDnode) {
pDnode->publicIp, taosIpStr(vnodeGid->publicIp));
vnodeGid->publicIp = pDnode->publicIp;
vnodeGid->privateIp = pDnode->privateIp;
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册