未验证 提交 c57bdc18 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2713 from taosdata/feature/vnode

Feature/vnode
...@@ -77,11 +77,6 @@ typedef struct { ...@@ -77,11 +77,6 @@ typedef struct {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SSdbObject; } SSdbObject;
typedef struct {
int32_t rowSize;
void * row;
} SSdbRow;
typedef struct { typedef struct {
pthread_t thread; pthread_t thread;
int32_t workerId; int32_t workerId;
...@@ -419,7 +414,7 @@ void sdbDecRef(void *handle, void *pObj) { ...@@ -419,7 +414,7 @@ void sdbDecRef(void *handle, void *pObj) {
} }
} }
static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) { static void *sdbGetRowMeta(SSdbTable *pTable, void *key) {
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
...@@ -427,24 +422,20 @@ static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) { ...@@ -427,24 +422,20 @@ static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
return taosHashGet(pTable->iHandle, key, keySize); void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize);
if (ppRow == NULL) return NULL;
return *ppRow;
} }
static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
} }
void *sdbGetRow(void *handle, void *key) { void *sdbGetRow(void *handle, void *key) {
SSdbTable *pTable = (SSdbTable *)handle; void *pRow = sdbGetRowMeta(handle, key);
int32_t keySize = sizeof(int32_t); if (pRow) {
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { sdbIncRef(handle, pRow);
keySize = strlen((char *)key); return pRow;
}
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
if (pMeta) {
sdbIncRef(pTable, pMeta->row);
return pMeta->row;
} else { } else {
return NULL; return NULL;
} }
...@@ -455,10 +446,6 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { ...@@ -455,10 +446,6 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
} }
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
SSdbRow rowMeta;
rowMeta.rowSize = pOper->rowSize;
rowMeta.row = pOper->pObj;
void * key = sdbGetObjKey(pTable, pOper->pObj); void * key = sdbGetObjKey(pTable, pOper->pObj);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
...@@ -466,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -466,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key); keySize = strlen((char *)key);
} }
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(void **));
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
atomic_add_fetch_32(&pTable->numOfRows, 1); atomic_add_fetch_32(&pTable->numOfRows, 1);
...@@ -586,17 +573,17 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -586,17 +573,17 @@ static int sdbWrite(void *param, void *data, int type) {
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
return sdbInsertHash(pTable, &oper); return sdbInsertHash(pTable, &oper);
} else if (action == SDB_ACTION_DELETE) { } else if (action == SDB_ACTION_DELETE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) { if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName, sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName,
pHead->cont); pHead->cont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSdbOper oper = {.table = pTable, .pObj = rowMeta->row}; SSdbOper oper = {.table = pTable, .pObj = pRow};
return sdbDeleteHash(pTable, &oper); return sdbDeleteHash(pTable, &oper);
} else if (action == SDB_ACTION_UPDATE) { } else if (action == SDB_ACTION_UPDATE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) { if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName, sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName,
pHead->cont); pHead->cont);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -675,18 +662,12 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -675,18 +662,12 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table; SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) { if (pRow == NULL) {
sdbDebug("table:%s, record is not there, delete failed", pTable->tableName); sdbDebug("table:%s, record is not there, delete failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
} }
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
sdbIncRef(pTable, pOper->pObj); sdbIncRef(pTable, pOper->pObj);
int32_t code = sdbDeleteHash(pTable, pOper); int32_t code = sdbDeleteHash(pTable, pOper);
...@@ -728,18 +709,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -728,18 +709,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table; SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) { if (pRow == NULL) {
sdbDebug("table:%s, record is not there, update failed", pTable->tableName); sdbDebug("table:%s, record is not there, update failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
} }
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
int32_t code = sdbUpdateHash(pTable, pOper); int32_t code = sdbUpdateHash(pTable, pOper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to update hash", pTable->tableName); sdbError("table:%s, failed to update hash", pTable->tableName);
...@@ -789,14 +764,14 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { ...@@ -789,14 +764,14 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
return NULL; return NULL;
} }
SSdbRow *pMeta = taosHashIterGet(pIter); void **ppMetaRow = taosHashIterGet(pIter);
if (pMeta == NULL) { if (ppMetaRow == NULL) {
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
return NULL; return NULL;
} }
*ppRow = pMeta->row; *ppRow = *ppMetaRow;
sdbIncRef(handle, pMeta->row); sdbIncRef(handle, *ppMetaRow);
return pIter; return pIter;
} }
...@@ -846,11 +821,11 @@ void sdbCloseTable(void *handle) { ...@@ -846,11 +821,11 @@ void sdbCloseTable(void *handle) {
SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle); SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
SSdbRow *pMeta = taosHashIterGet(pIter); void **ppRow = taosHashIterGet(pIter);
if (pMeta == NULL) continue; if (ppRow == NULL) continue;
SSdbOper oper = { SSdbOper oper = {
.pObj = pMeta->row, .pObj = *ppRow,
.table = pTable, .table = pTable,
}; };
......
...@@ -107,42 +107,41 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) { ...@@ -107,42 +107,41 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId); mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
} }
mnodeDecVgroupRef(pVgroup);
SDbObj *pDb = mnodeGetDb(pVgroup->dbName); SDbObj *pDb = NULL;
if (pVgroup != NULL) {
pDb = mnodeGetDb(pVgroup->dbName);
if (pDb == NULL) { if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_MND_INVALID_DB;
} }
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
} }
mnodeDecDbRef(pDb);
SAcctObj *pAcct = mnodeGetAcct(pDb->acct); SAcctObj *pAcct = NULL;
if (pDb != NULL) {
pAcct = mnodeGetAcct(pDb->acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
return TSDB_CODE_MND_INVALID_ACCT;
} }
mnodeDecAcctRef(pAcct); }
if (pTable->info.type == TSDB_CHILD_TABLE) { if (pTable->info.type == TSDB_CHILD_TABLE) {
// add ref // add ref
pTable->superTable = mnodeGetSuperTableByUid(pTable->suid); pTable->superTable = mnodeGetSuperTableByUid(pTable->suid);
mnodeAddTableIntoStable(pTable->superTable, pTable); mnodeAddTableIntoStable(pTable->superTable, pTable);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
} else { } else {
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
} }
mnodeAddTableIntoDb(pDb); if (pDb) mnodeAddTableIntoDb(pDb);
mnodeAddTableIntoVgroup(pVgroup, pTable); if (pVgroup) mnodeAddTableIntoVgroup(pVgroup, pTable);
mnodeDecVgroupRef(pVgroup);
mnodeDecDbRef(pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册