提交 c1e1dd64 编写于 作者: S Shengliang Guan

TD-2045

上级 98ec34b4
...@@ -52,11 +52,11 @@ typedef enum { ...@@ -52,11 +52,11 @@ typedef enum {
typedef struct SSWriteMsg { typedef struct SSWriteMsg {
ESdbOper type; ESdbOper type;
int32_t processedCount; // for sync fwd callback int32_t processedCount; // for sync fwd callback
int32_t retCode; // for callback in sdb queue int32_t code; // for callback in sdb queue
int32_t rowSize; int32_t rowSize;
void * rowData; void * rowData;
int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpWrite)(SMnodeMsg *pMsg, int32_t code); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
void * pRow; void * pRow;
SMnodeMsg *pMsg; SMnodeMsg *pMsg;
struct SSdbTable *pTable; struct SSdbTable *pTable;
......
...@@ -418,7 +418,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -418,7 +418,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.pRow = pDb, .pRow = pDb,
.rowSize = sizeof(SDbObj), .rowSize = sizeof(SDbObj),
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeCreateDbCb .fpRsp = mnodeCreateDbCb
}; };
code = sdbInsertRow(&wmsg); code = sdbInsertRow(&wmsg);
...@@ -1024,7 +1024,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { ...@@ -1024,7 +1024,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
.pTable = tsDbSdb, .pTable = tsDbSdb,
.pRow = pDb, .pRow = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAlterDbCb .fpRsp = mnodeAlterDbCb
}; };
code = sdbUpdateRow(&wmsg); code = sdbUpdateRow(&wmsg);
...@@ -1076,7 +1076,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { ...@@ -1076,7 +1076,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
.pTable = tsDbSdb, .pTable = tsDbSdb,
.pRow = pDb, .pRow = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeDropDbCb .fpRsp = mnodeDropDbCb
}; };
int32_t code = sdbDeleteRow(&wmsg); int32_t code = sdbDeleteRow(&wmsg);
......
...@@ -329,7 +329,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) { ...@@ -329,7 +329,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsMnodeSdb, .pTable = tsMnodeSdb,
.pRow = pMnode, .pRow = pMnode,
.fpWrite = mnodeCreateMnodeCb .fpRsp = mnodeCreateMnodeCb
}; };
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
......
...@@ -48,6 +48,13 @@ typedef enum { ...@@ -48,6 +48,13 @@ typedef enum {
SDB_STATUS_CLOSING = 2 SDB_STATUS_CLOSING = 2
} ESdbStatus; } ESdbStatus;
char *actStr[] = {
"insert",
"delete",
"update",
"invalid"
};
typedef struct SSdbTable { typedef struct SSdbTable {
char tableName[SDB_TABLE_LEN]; char tableName[SDB_TABLE_LEN];
ESdbTable tableId; ESdbTable tableId;
...@@ -140,18 +147,6 @@ static void *sdbGetObjKey(SSdbTable *pTable, void *key) { ...@@ -140,18 +147,6 @@ static void *sdbGetObjKey(SSdbTable *pTable, void *key) {
return key; return key;
} }
static char *sdbGetActionStr(int32_t action) {
switch (action) {
case SDB_ACTION_INSERT:
return "insert";
case SDB_ACTION_DELETE:
return "delete";
case SDB_ACTION_UPDATE:
return "update";
}
return "invalid";
}
static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { static char *sdbGetKeyStr(SSdbTable *pTable, void *key) {
static char str[16]; static char str[16];
switch (pTable->keyType) { switch (pTable->keyType) {
...@@ -167,7 +162,7 @@ static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { ...@@ -167,7 +162,7 @@ static char *sdbGetKeyStr(SSdbTable *pTable, void *key) {
} }
} }
static char *sdbGetObjStr(SSdbTable *pTable, void *key) { static char *sdbGetRowStr(SSdbTable *pTable, void *key) {
return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key)); return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key));
} }
...@@ -254,33 +249,28 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { ...@@ -254,33 +249,28 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
} }
FORCE_INLINE FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
assert(param); if (wparam == NULL) return;
SSWriteMsg * pWrite = param; SSWriteMsg *pWrite = wparam;
SMnodeMsg *pMsg = pWrite->pMsg; SMnodeMsg * pMsg = pWrite->pMsg;
if (code <= 0) pWrite->retCode = code;
int32_t processedCount = atomic_add_fetch_32(&pWrite->processedCount, 1);
if (processedCount <= 1) {
if (pMsg != NULL) {
sdbDebug("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, processedCount, code);
}
return;
}
if (pMsg != NULL) { if (code <= 0) pWrite->code = code;
sdbDebug("vgId:1, msg:%p is confirmed, code:%x", pMsg, code); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
if (count <= 1) {
if (pMsg != NULL) sdbTrace("vgId:1, msg:%p waiting for confirm, count:%d code:%x", pMsg, count, code);
return;
} else {
if (pMsg != NULL) sdbTrace("vgId:1, msg:%p is confirmed, code:%x", pMsg, code);
} }
// failed to forward, need revert insert // failed to forward, need revert insert
if (pWrite->retCode != TSDB_CODE_SUCCESS) { if (pWrite->code != TSDB_CODE_SUCCESS) {
SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; SWalHead *pHead = (SWalHead *)((char *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
int32_t action = pHead->msgType % 10; int32_t action = pHead->msgType % 10;
sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pRow, sdbError("vgId:1, row:%p:%s hver:%" PRIu64 " action:%s, failed to foward since %s", pWrite->pRow,
sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode)); sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, actStr[action], tstrerror(pWrite->code));
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
// It's better to create a table in two stages, create it first and then set it success // It's better to create a table in two stages, create it first and then set it success
//sdbDeleteHash(pWrite->pTable, pWrite);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = pWrite->pTable, .pTable = pWrite->pTable,
...@@ -290,10 +280,10 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -290,10 +280,10 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
} }
} }
if (pWrite->fpWrite != NULL) { if (pWrite->fpRsp != NULL) {
pWrite->retCode = (*pWrite->fpWrite)(pMsg, pWrite->retCode); pWrite->code = (*pWrite->fpRsp)(pMsg, pWrite->code);
} }
dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode); dnodeSendRpcMWriteRsp(pMsg, pWrite->code);
// if ahandle, means this func is called by sdb write // if ahandle, means this func is called by sdb write
if (ahandle == NULL) { if (ahandle == NULL) {
...@@ -449,7 +439,7 @@ void sdbIncRef(void *tparam, void *pRow) { ...@@ -449,7 +439,7 @@ void sdbIncRef(void *tparam, void *pRow) {
SSdbTable *pTable = tparam; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_add_fetch_32(pRefCount, 1); int32_t refCount = atomic_add_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, inc ref to row:%p:%s:%d", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount);
} }
void sdbDecRef(void *tparam, void *pRow) { void sdbDecRef(void *tparam, void *pRow) {
...@@ -458,11 +448,11 @@ void sdbDecRef(void *tparam, void *pRow) { ...@@ -458,11 +448,11 @@ void sdbDecRef(void *tparam, void *pRow) {
SSdbTable *pTable = tparam; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, dec ref to row:%p:%s:%d", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount);
int32_t *updateEnd = pRow + pTable->refCountPos - 4; int32_t *updateEnd = pRow + pTable->refCountPos - 4;
if (refCount <= 0 && *updateEnd) { if (refCount <= 0 && *updateEnd) {
sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pRow, sdbGetObjStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->tableName, pRow, sdbGetRowStr(pTable, pRow), refCount);
SSWriteMsg wmsg = {.pRow = pRow}; SSWriteMsg wmsg = {.pRow = pRow};
(*pTable->fpDestroy)(&wmsg); (*pTable->fpDestroy)(&wmsg);
} }
...@@ -523,12 +513,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) { ...@@ -523,12 +513,12 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
} }
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->tableName, sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg); sdbGetRowStr(pTable, pWrite->pRow), pWrite->rowSize, pTable->numOfRows, pWrite->pMsg);
int32_t code = (*pTable->fpInsert)(pWrite); int32_t code = (*pTable->fpInsert)(pWrite);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->tableName, sdbError("vgId:1, sdb:%s, failed to insert key:%s to hash, remove it", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow)); sdbGetRowStr(pTable, pWrite->pRow));
sdbDeleteHash(pTable, pWrite); sdbDeleteHash(pTable, pWrite);
} }
...@@ -540,7 +530,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { ...@@ -540,7 +530,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0;
if (!set) { if (!set) {
sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->tableName, sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow)); sdbGetRowStr(pTable, pWrite->pRow));
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
} }
...@@ -559,7 +549,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { ...@@ -559,7 +549,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
atomic_sub_fetch_32(&pTable->numOfRows, 1); atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg);
sdbDecRef(pTable, pWrite->pRow); sdbDecRef(pTable, pWrite->pRow);
...@@ -568,7 +558,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) { ...@@ -568,7 +558,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName, sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg); sdbGetRowStr(pTable, pWrite->pRow), pTable->numOfRows, pWrite->pMsg);
(*pTable->fpUpdate)(pWrite); (*pTable->fpUpdate)(pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -594,12 +584,12 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { ...@@ -594,12 +584,12 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
if (pHead->version <= tsSdbMgmt.version) { if (pHead->version <= tsSdbMgmt.version) {
pthread_mutex_unlock(&tsSdbMgmt.mutex); pthread_mutex_unlock(&tsSdbMgmt.mutex);
sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); pTable->tableName, actStr[action], sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbMgmt.version + 1) { } else if (pHead->version != tsSdbMgmt.version + 1) {
pthread_mutex_unlock(&tsSdbMgmt.mutex); pthread_mutex_unlock(&tsSdbMgmt.mutex);
sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version); pTable->tableName, actStr[action], sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbMgmt.version);
return TSDB_CODE_SYN_INVALID_VERSION; return TSDB_CODE_SYN_INVALID_VERSION;
} else { } else {
tsSdbMgmt.version = pHead->version; tsSdbMgmt.version = pHead->version;
...@@ -623,19 +613,19 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { ...@@ -623,19 +613,19 @@ static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
if (syncCode < 0) { if (syncCode < 0) {
sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, sdbError("vgId:1, sdb:%s, failed to forward req since %s action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); tstrerror(syncCode), actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
} else if (syncCode > 0) { } else if (syncCode > 0) {
sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, sdbDebug("vgId:1, sdb:%s, forward req is sent, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
} else { } else {
sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName, sdbTrace("vgId:1, sdb:%s, no need to send fwd req, action:%s key:%s hver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version, pWrite->pMsg);
} }
return syncCode; return syncCode;
} }
sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->tableName, sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC); syncForwardToPeer(tsSdbMgmt.sync, pHead, pWrite, TAOS_QTYPE_RPC);
...@@ -675,7 +665,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) { ...@@ -675,7 +665,7 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
if (sdbGetRowFromObj(pTable, pWrite->pRow)) { if (sdbGetRowFromObj(pTable, pWrite->pRow)) {
sdbError("vgId:1, sdb:%s, failed to insert key:%s, already exist", pTable->tableName, sdbError("vgId:1, sdb:%s, failed to insert key:%s, already exist", pTable->tableName,
sdbGetObjStr(pTable, pWrite->pRow)); sdbGetRowStr(pTable, pWrite->pRow));
sdbDecRef(pTable, pWrite->pRow); sdbDecRef(pTable, pWrite->pRow);
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
} }
...@@ -712,9 +702,9 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { ...@@ -712,9 +702,9 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSWriteMsg *pNewOper = taosAllocateQitem(size); SSWriteMsg *pNewWrite = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; SWalHead *pHead = (SWalHead *)((char *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->rowSize; pHead->len = pWrite->rowSize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
...@@ -723,15 +713,15 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { ...@@ -723,15 +713,15 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
(*pTable->fpEncode)(pWrite); (*pTable->fpEncode)(pWrite);
pHead->len = pWrite->rowSize; pHead->len = pWrite->rowSize;
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
if (pNewOper->pMsg != NULL) { if (pNewWrite->pMsg != NULL) {
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, insert action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
} }
sdbIncRef(pNewOper->pTable, pNewOper->pRow); sdbIncRef(pNewWrite->pTable, pNewWrite->pRow);
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -781,9 +771,9 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { ...@@ -781,9 +771,9 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSWriteMsg *pNewOper = taosAllocateQitem(size); SSWriteMsg *pNewWrite = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
pHead->version = 0; pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
...@@ -791,14 +781,14 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { ...@@ -791,14 +781,14 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
(*pTable->fpEncode)(pWrite); (*pTable->fpEncode)(pWrite);
pHead->len = pWrite->rowSize; pHead->len = pWrite->rowSize;
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
if (pNewOper->pMsg != NULL) { if (pNewWrite->pMsg != NULL) {
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, delete action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
} }
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -836,9 +826,9 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { ...@@ -836,9 +826,9 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSWriteMsg *pNewOper = taosAllocateQitem(size); SSWriteMsg *pNewWrite = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSWriteMsg) + SDB_SYNC_HACK; SWalHead *pHead = (SWalHead *)((void *)pNewWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK);
pHead->version = 0; pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
...@@ -846,15 +836,15 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { ...@@ -846,15 +836,15 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
(*pTable->fpEncode)(pWrite); (*pTable->fpEncode)(pWrite);
pHead->len = pWrite->rowSize; pHead->len = pWrite->rowSize;
memcpy(pNewOper, pWrite, sizeof(SSWriteMsg)); memcpy(pNewWrite, pWrite, sizeof(SSWriteMsg));
if (pNewOper->pMsg != NULL) { if (pNewWrite->pMsg != NULL) {
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s, update action is add to sdb queue", pNewWrite->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pWrite->pRow, sdbGetObjStr(pTable, pWrite->pRow)); pNewWrite->pMsg, pTable->tableName, pWrite->pRow, sdbGetRowStr(pTable, pWrite->pRow));
} }
sdbIncRef(pNewOper->pTable, pNewOper->pRow); sdbIncRef(pNewWrite->pTable, pNewWrite->pRow);
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewWrite);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
...@@ -1071,7 +1061,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1071,7 +1061,7 @@ static void *sdbWorkerFp(void *pWorker) {
pWrite->processedCount = 1; pWrite->processedCount = 1;
pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
if (pWrite->pMsg != NULL) { if (pWrite->pMsg != NULL) {
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s row:%p:%s hver:%" PRIu64 ", will be processed in sdb queue",
pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->tableName, pWrite->pRow, pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->tableName, pWrite->pRow,
sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version); sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version);
} }
...@@ -1083,7 +1073,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1083,7 +1073,7 @@ static void *sdbWorkerFp(void *pWorker) {
int32_t code = sdbWrite(pWrite, pHead, qtype, NULL); int32_t code = sdbWrite(pWrite, pHead, qtype, NULL);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pWrite) { if (pWrite) {
pWrite->retCode = code; pWrite->code = code;
} else { } else {
pHead->len = code; // hackway pHead->len = code; // hackway
} }
...@@ -1098,7 +1088,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1098,7 +1088,7 @@ static void *sdbWorkerFp(void *pWorker) {
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = (SSWriteMsg *)item; pWrite = (SSWriteMsg *)item;
sdbConfirmForward(NULL, pWrite, pWrite->retCode); sdbConfirmForward(NULL, pWrite, pWrite->code);
} else if (qtype == TAOS_QTYPE_FWD) { } else if (qtype == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
syncConfirmForward(tsSdbMgmt.sync, pHead->version, pHead->len); syncConfirmForward(tsSdbMgmt.sync, pHead->version, pHead->len);
......
...@@ -879,12 +879,12 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -879,12 +879,12 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
mnodeIncTableRef(pMsg->pTable); mnodeIncTableRef(pMsg->pTable);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.rowSize = sizeof(SSTableObj) + schemaSize, .rowSize = sizeof(SSTableObj) + schemaSize,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeCreateSuperTableCb .fpRsp = mnodeCreateSuperTableCb
}; };
int32_t code = sdbInsertRow(&wmsg); int32_t code = sdbInsertRow(&wmsg);
...@@ -942,7 +942,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -942,7 +942,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeDropSuperTableCb .fpRsp = mnodeDropSuperTableCb
}; };
int32_t code = sdbDeleteRow(&wmsg); int32_t code = sdbDeleteRow(&wmsg);
...@@ -1011,11 +1011,11 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t ...@@ -1011,11 +1011,11 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
schema[0].name); schema[0].name);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAddSuperTableTagCb .fpRsp = mnodeAddSuperTableTagCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1045,11 +1045,11 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { ...@@ -1045,11 +1045,11 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeDropSuperTableTagCb .fpRsp = mnodeDropSuperTableTagCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1089,11 +1089,11 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c ...@@ -1089,11 +1089,11 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
oldTagName, newTagName); oldTagName, newTagName);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeModifySuperTableTagNameCb .fpRsp = mnodeModifySuperTableTagNameCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1163,11 +1163,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1163,11 +1163,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAddSuperTableColumnCb .fpRsp = mnodeAddSuperTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1208,11 +1208,11 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1208,11 +1208,11 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeDropSuperTableColumnCb .fpRsp = mnodeDropSuperTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1252,11 +1252,11 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char ...@@ -1252,11 +1252,11 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
oldName, newName); oldName, newName);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pRow = pStable, .pRow = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeChangeSuperTableColumnCb .fpRsp = mnodeChangeSuperTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -1902,11 +1902,11 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1902,11 +1902,11 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
} }
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pRow = pTable, .pRow = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeDropChildTableCb .fpRsp = mnodeDropChildTableCb
}; };
int32_t code = sdbDeleteRow(&wmsg); int32_t code = sdbDeleteRow(&wmsg);
...@@ -2006,11 +2006,11 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -2006,11 +2006,11 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pRow = pTable, .pRow = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -2039,11 +2039,11 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -2039,11 +2039,11 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pRow = pTable, .pRow = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -2076,11 +2076,11 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char ...@@ -2076,11 +2076,11 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
oldName, newName); oldName, newName);
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pRow = pTable, .pRow = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.fpWrite = mnodeAlterNormalTableColumnCb .fpRsp = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&wmsg); return sdbUpdateRow(&wmsg);
...@@ -2411,11 +2411,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2411,11 +2411,11 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SSWriteMsg desc = { SSWriteMsg desc = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pRow = pTable, .pRow = pTable,
.pTable = tsChildTableSdb, .pTable = tsChildTableSdb,
.pMsg = mnodeMsg, .pMsg = mnodeMsg,
.fpWrite = mnodeDoCreateChildTableCb .fpRsp = mnodeDoCreateChildTableCb
}; };
int32_t code = sdbInsertRowImp(&desc); int32_t code = sdbInsertRowImp(&desc);
......
...@@ -958,12 +958,12 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -958,12 +958,12 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received == mnodeMsg->successed) { if (mnodeMsg->received == mnodeMsg->successed) {
SSWriteMsg wmsg = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb, .pTable = tsVgroupSdb,
.pRow = pVgroup, .pRow = pVgroup,
.rowSize = sizeof(SVgObj), .rowSize = sizeof(SVgObj),
.pMsg = mnodeMsg, .pMsg = mnodeMsg,
.fpWrite = mnodeCreateVgroupCb .fpRsp = mnodeCreateVgroupCb
}; };
int32_t code = sdbInsertRowImp(&wmsg); int32_t code = sdbInsertRowImp(&wmsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册