提交 7466267b 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

......@@ -52,12 +52,20 @@ typedef struct STableComInfo {
int32_t rowSize;
} STableComInfo;
typedef struct SCMCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
typedef struct STableMeta {
STableComInfo tableInfo;
uint8_t tableType;
int16_t sversion;
int16_t tversion;
SCMVgroupInfo vgroupInfo;
SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
......@@ -456,7 +464,8 @@ extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern SRpcEpSet tscMgmtEpSet;
extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
......@@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() {
strcpy(s.name, TSQL_TBNAME_L);
return s;
}
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port;
}
}
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
assert(pTableMetaMsg != NULL);
......@@ -157,6 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
......
......@@ -26,10 +26,11 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "tlockfree.h"
#define TSC_MGMT_VNODE 999
SRpcEpSet tscMgmtEpSet;
SRpcCorEpSet tscMgmtEpSet;
SRpcEpSet tscDnodeEpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
......@@ -51,37 +52,78 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
pEpSet->numOfEps = 0;
return;
}
pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
}
}
void tscPrintMgmtEp() {
if (tscMgmtEpSet.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps);
} else {
for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
}
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
taosCorBeginRead(&tscMgmtEpSet.version);
*epSet = tscMgmtEpSet.epSet;
taosCorEndRead(&tscMgmtEpSet.version);
}
static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) {
s->port[i] = htons(s->port[i]);
}
}
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
// no need to update if equal
taosCorBeginWrite(&tscMgmtEpSet.version);
tscMgmtEpSet.epSet = *pEpSet;
taosCorEndWrite(&tscMgmtEpSet.version);
}
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
if (pVgroupInfo == NULL) { return;}
taosCorBeginRead(&pVgroupInfo->version);
int8_t inUse = pVgroupInfo->inUse;
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
pEpSet->numOfEps = pVgroupInfo->numOfEps;
for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
}
taosCorEndRead(&pVgroupInfo->version);
}
void tscSetMgmtEpSet(SRpcEpSet *pEpSet) {
tscMgmtEpSet.numOfEps = pEpSet->numOfEps;
tscMgmtEpSet.inUse = pEpSet->inUse;
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscMgmtEpSet.port[i] = htons(pEpSet->port[i]);
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pObj->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
taosCorBeginWrite(&pVgroupInfo->version);
//TODO(dengyihao), dont care vgid
pVgroupInfo->inUse = pEpSet->inUse;
pVgroupInfo->numOfEps = pEpSet->numOfEps;
for (int32_t i = 0; pVgroupInfo->numOfEps; i++) {
strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
}
taosCorEndWrite(&pVgroupInfo->version);
}
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
tscMgmtEpSet = *pEpSet;
tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse);
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
if (dump.numOfEps <= 0) {
tscError("invalid mnode EP list:%d", dump.numOfEps);
} else {
for (int i = 0; i < dump.numOfEps; ++i) {
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
}
}
}
......@@ -95,7 +137,9 @@ void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2;
return tscMgmtEpSet.numOfEps * factor;
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
return dump.numOfEps * factor;
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
......@@ -111,9 +155,11 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcEpSet * pEpSet = &pRsp->epSet;
if (pEpSet->numOfEps > 0)
tscSetMgmtEpSet(pEpSet);
SRpcEpSet * epSet = &pRsp->epSet;
if (epSet->numOfEps > 0) {
tscEpSetHtons(epSet);
tscUpdateMgmtEpSet(epSet);
}
pSql->pTscObj->connId = htonl(pRsp->connId);
......@@ -185,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
// set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
pSql->epSet = tscMgmtEpSet;
tscDumpMgmtEpSet(&pSql->epSet);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
......@@ -236,10 +282,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
if (pCmd->command < TSDB_SQL_MGMT) {
if (pEpSet) pSql->epSet = *pEpSet;
} else {
if (pEpSet) tscMgmtEpSet = *pEpSet;
if (pEpSet) {
//SRpcEpSet dump;
tscEpSetHtons(pEpSet);
if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
if(pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet);
} else {
tscUpdateMgmtEpSet(pEpSet);
}
}
}
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
......@@ -400,7 +452,8 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
} else if (pCmd->command < TSDB_SQL_LOCAL) {
pSql->epSet = tscMgmtEpSet;
//pSql->epSet = tscMgmtEpSet;
} else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......@@ -504,8 +557,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->epSet.numOfEps);
return TSDB_CODE_SUCCESS;
......@@ -546,11 +599,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else {
pVgroupInfo = &pTableMeta->vgroupInfo;
}
tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
}
}
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid);
......@@ -567,8 +620,8 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info
// set the vgroup info
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
......@@ -1115,11 +1168,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShowMsg->payloadLen = htons(pPattern->n);
}
} else {
SSQLToken *pIpAddr = &pShowInfo->prefix;
assert(pIpAddr->n > 0 && pIpAddr->type > 0);
SSQLToken *pEpAddr = &pShowInfo->prefix;
assert(pEpAddr->n > 0 && pEpAddr->type > 0);
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
pShowMsg->payloadLen = htons(pIpAddr->n);
strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
pShowMsg->payloadLen = htons(pEpAddr->n);
}
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
......@@ -1302,7 +1355,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
return TSDB_CODE_SUCCESS;
}
......@@ -1826,13 +1879,14 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
memcpy(pInfo->vgroupList, pVgroupInfo, size);
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
//just init, no need to lock
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfEps >= 1);
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
}
pMsg += size;
......@@ -1925,8 +1979,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= sizeof(pObj->db));
tstrncpy(pObj->db, temp, sizeof(pObj->db));
if (pConnect->epSet.numOfEps > 0)
tscSetMgmtEpSet(&pConnect->epSet);
if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet);
}
strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth;
......
......@@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
if (ip) {
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtEpSet.port[0] = port;
if (port) tscMgmtEpSet.epSet.port[0] = port;
}
void *pDnodeConn = NULL;
......
......@@ -41,7 +41,7 @@ int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk();
......
......@@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) {
}
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
tscMgmtEpSet.numOfEps = 0;
tscMgmtEpSet.inUse = 0;
// init mgmt ip set
tscMgmtEpSet.version = 0;
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (first && first[0] != 0) {
if (strlen(first) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
tscMgmtEpSet.numOfEps++;
taosGetFqdnPortFromEp(first, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
mgmtEpSet->numOfEps++;
}
if (second && second[0] != 0) {
......@@ -2163,11 +2166,11 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
tscMgmtEpSet.numOfEps++;
taosGetFqdnPortFromEp(second, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
mgmtEpSet->numOfEps++;
}
if ( tscMgmtEpSet.numOfEps == 0) {
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
......
......@@ -35,6 +35,11 @@ typedef struct SRpcEpSet {
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet;
typedef struct SRpcCorEpSet {
int32_t version;
SRpcEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo {
uint32_t clientIp;
uint16_t clientPort;
......
......@@ -77,11 +77,6 @@ typedef struct {
pthread_mutex_t mutex;
} SSdbObject;
typedef struct {
int32_t rowSize;
void * row;
} SSdbRow;
typedef struct {
pthread_t thread;
int32_t workerId;
......@@ -419,32 +414,28 @@ void sdbDecRef(void *handle, void *pObj) {
}
}
static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) {
static void *sdbGetRowMeta(SSdbTable *pTable, void *key) {
if (pTable == NULL) return NULL;
int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key);
}
return taosHashGet(pTable->iHandle, key, keySize);
void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize);
if (ppRow == NULL) return NULL;
return *ppRow;
}
static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
}
void *sdbGetRow(void *handle, void *key) {
SSdbTable *pTable = (SSdbTable *)handle;
int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key);
}
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
if (pMeta) {
sdbIncRef(pTable, pMeta->row);
return pMeta->row;
void *pRow = sdbGetRowMeta(handle, key);
if (pRow) {
sdbIncRef(handle, pRow);
return pRow;
} else {
return NULL;
}
......@@ -455,10 +446,6 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
}
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
SSdbRow rowMeta;
rowMeta.rowSize = pOper->rowSize;
rowMeta.row = pOper->pObj;
void * key = sdbGetObjKey(pTable, pOper->pObj);
int32_t keySize = sizeof(int32_t);
......@@ -466,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
keySize = strlen((char *)key);
}
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(void **));
sdbIncRef(pTable, pOper->pObj);
atomic_add_fetch_32(&pTable->numOfRows, 1);
......@@ -586,17 +573,17 @@ static int sdbWrite(void *param, void *data, int type) {
code = (*pTable->decodeFp)(&oper);
return sdbInsertHash(pTable, &oper);
} else if (action == SDB_ACTION_DELETE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) {
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName,
pHead->cont);
return TSDB_CODE_SUCCESS;
}
SSdbOper oper = {.table = pTable, .pObj = rowMeta->row};
SSdbOper oper = {.table = pTable, .pObj = pRow};
return sdbDeleteHash(pTable, &oper);
} else if (action == SDB_ACTION_UPDATE) {
SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
if (rowMeta == NULL || rowMeta->row == NULL) {
void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (pRow == NULL) {
sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName,
pHead->cont);
return TSDB_CODE_SUCCESS;
......@@ -675,18 +662,12 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) {
void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pRow == NULL) {
sdbDebug("table:%s, record is not there, delete failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
}
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
sdbIncRef(pTable, pOper->pObj);
int32_t code = sdbDeleteHash(pTable, pOper);
......@@ -728,18 +709,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pMeta == NULL) {
void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj);
if (pRow == NULL) {
sdbDebug("table:%s, record is not there, update failed", pTable->tableName);
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
}
void *pMetaRow = pMeta->row;
if (pMetaRow == NULL) {
sdbError("table:%s, record meta is null", pTable->tableName);
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
}
int32_t code = sdbUpdateHash(pTable, pOper);
if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to update hash", pTable->tableName);
......@@ -789,14 +764,14 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
return NULL;
}
SSdbRow *pMeta = taosHashIterGet(pIter);
if (pMeta == NULL) {
void **ppMetaRow = taosHashIterGet(pIter);
if (ppMetaRow == NULL) {
taosHashDestroyIter(pIter);
return NULL;
}
*ppRow = pMeta->row;
sdbIncRef(handle, pMeta->row);
*ppRow = *ppMetaRow;
sdbIncRef(handle, *ppMetaRow);
return pIter;
}
......@@ -846,11 +821,11 @@ void sdbCloseTable(void *handle) {
SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle);
while (taosHashIterNext(pIter)) {
SSdbRow *pMeta = taosHashIterGet(pIter);
if (pMeta == NULL) continue;
void **ppRow = taosHashIterGet(pIter);
if (ppRow == NULL) continue;
SSdbOper oper = {
.pObj = pMeta->row,
.pObj = *ppRow,
.table = pTable,
};
......
......@@ -107,42 +107,41 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
mnodeDecVgroupRef(pVgroup);
SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_MND_INVALID_DB;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
SDbObj *pDb = NULL;
if (pVgroup != NULL) {
pDb = mnodeGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
}
}
mnodeDecDbRef(pDb);
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
return TSDB_CODE_MND_INVALID_ACCT;
SAcctObj *pAcct = NULL;
if (pDb != NULL) {
pAcct = mnodeGetAcct(pDb->acct);
if (pAcct == NULL) {
mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct);
}
}
mnodeDecAcctRef(pAcct);
if (pTable->info.type == TSDB_CHILD_TABLE) {
// add ref
pTable->superTable = mnodeGetSuperTableByUid(pTable->suid);
mnodeAddTableIntoStable(pTable->superTable, pTable);
grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
} else {
grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1);
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
}
mnodeAddTableIntoDb(pDb);
mnodeAddTableIntoVgroup(pVgroup, pTable);
if (pDb) mnodeAddTableIntoDb(pDb);
if (pVgroup) mnodeAddTableIntoVgroup(pVgroup, pTable);
mnodeDecVgroupRef(pVgroup);
mnodeDecDbRef(pDb);
mnodeDecAcctRef(pAcct);
return TSDB_CODE_SUCCESS;
}
......
......@@ -75,7 +75,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
// copy on read
#define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \
int32_t old_ = atomic_load_32(x); \
int32_t old_ = atomic_add_fetch_32((x), 0); \
if (old_ & 0x00000001) { \
if (i_ % 1000 == 0) { \
sched_yield(); \
......@@ -84,7 +84,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
}
#define taosCorEndRead(x) \
if (atomic_load_32(x) == old_) { \
if (atomic_add_fetch_32((x), 0) == old_) { \
break; \
} \
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册