未验证 提交 d65a6745 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12031 from taosdata/feature/dnode

test: add unitest for sdb
...@@ -107,10 +107,9 @@ typedef enum { ...@@ -107,10 +107,9 @@ typedef enum {
typedef enum { typedef enum {
SDB_STATUS_INIT = 0, SDB_STATUS_INIT = 0,
SDB_STATUS_CREATING = 1, SDB_STATUS_CREATING = 1,
SDB_STATUS_UPDATING = 2, SDB_STATUS_DROPPING = 2,
SDB_STATUS_DROPPING = 3, SDB_STATUS_DROPPED = 3,
SDB_STATUS_READY = 4, SDB_STATUS_READY = 4,
SDB_STATUS_DROPPED = 5
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
......
...@@ -87,6 +87,7 @@ int32_t* taosGetErrno(); ...@@ -87,6 +87,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0113) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0113)
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115) #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
......
...@@ -72,6 +72,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -72,6 +72,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
NodeMsgFp msgFp = NULL; NodeMsgFp msgFp = NULL;
uint16_t msgType = pRpc->msgType; uint16_t msgType = pRpc->msgType;
bool needRelease = false; bool needRelease = false;
bool isReq = msgType & 1U;
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) { if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet); dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
...@@ -85,13 +86,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -85,13 +86,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER; if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType == DND_PROC_SINGLE) { if (pWrapper->procType == DND_PROC_SINGLE) {
dTrace("msg:%p, is created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user); dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == DND_PROC_PARENT) { } else if (pWrapper->procType == DND_PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg,
pRpc->handle, pMsg->user); TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen,
pRpc->refId, PROC_FUNC_REQ); (isReq && (pMsg->rpcMsg.code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1); ASSERT(1);
...@@ -100,12 +101,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -100,12 +101,13 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
_OVER: _OVER:
if (code == 0) { if (code == 0) {
if (pWrapper->procType == DND_PROC_PARENT) { if (pWrapper->procType == DND_PROC_PARENT) {
dTrace("msg:%p, is freed in parent process", pMsg); dTrace("msg:%p, freed in parent process", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
} else { } else {
dError("msg:%p, type:%s failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), code & 0XFFFF, terrstr()); dError("msg:%p, type:%s handle:%p failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), pRpc->handle,
code & 0XFFFF, terrstr());
if (msgType & 1U) { if (msgType & 1U) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) { if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
...@@ -359,29 +361,31 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ...@@ -359,29 +361,31 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
EProcFuncType ftype) { EProcFuncType ftype) {
int32_t code = pMsg->code & 0xFFFF;
pMsg->pCont = pCont; pMsg->pCont = pCont;
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
switch (ftype) { if (ftype == PROC_FUNC_REQ) {
case PROC_FUNC_REGIST: dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
pMsg->handle, code, pMsg->ahandle);
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
} else if (ftype == PROC_FUNC_RSP) {
dTrace("msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code, pMsg->ahandle);
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg);
} else if (ftype == PROC_FUNC_REGIST) {
dTrace("msg:%p, get from parent queue, regist handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
pMsg->ahandle);
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
break; } else if (ftype == PROC_FUNC_RELEASE) {
case PROC_FUNC_RELEASE: dTrace("msg:%p, get from parent queue, release handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code,
pMsg->ahandle);
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont); rpcFreeCont(pCont);
break; } else {
case PROC_FUNC_REQ: dError("msg:%p, invalid ftype:%d while get from parent queue, handle:%p", pMsg, ftype, pMsg->handle);
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break;
case PROC_FUNC_RSP:
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg);
break;
default:
break;
} }
taosMemoryFree(pMsg); taosMemoryFree(pMsg);
} }
......
...@@ -136,7 +136,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -136,7 +136,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// sync integration response // sync integration response
for (int i = 0; i < taosArrayGetSize(pArray); i++) { for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg * pRpc; SRpcMsg *pRpc;
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg; pRpc = &pMsg->rpcMsg;
...@@ -175,7 +175,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -175,7 +175,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
SRpcMsg rsp; SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
...@@ -218,7 +218,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -218,7 +218,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -231,7 +231,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -231,7 +231,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -248,7 +248,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -248,7 +248,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg * pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
...@@ -262,23 +262,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp ...@@ -262,23 +262,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
int32_t code = 0; int32_t code = 0;
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, will be written into vnode-query queue", pMsg); dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pQueryQ, pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); dTrace("msg:%p, type:%s will be written into vnode-fetch queue", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
dTrace("msg:%p, will be written into vnode-write queue", pMsg); dTrace("msg:%p, type:%s will be written into vnode-write queue", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pWriteQ, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg);
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:
dTrace("msg:%p, will be written into vnode-sync queue", pMsg); dTrace("msg:%p, type:%s will be written into vnode-sync queue", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pSyncQ, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg);
break; break;
case MERGE_QUEUE: case MERGE_QUEUE:
dTrace("msg:%p, will be written into vnode-merge queue", pMsg); dTrace("msg:%p, type:%s will be written into vnode-merge queue", pMsg, TMSG_INFO(pRpc->msgType));
taosWriteQitem(pVnode->pMergeQ, pMsg); taosWriteQitem(pVnode->pMergeQ, pMsg);
break; break;
default: default:
...@@ -317,7 +317,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -317,7 +317,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt * pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->mgmtWorker; SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
...@@ -325,7 +325,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -325,7 +325,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt * pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -335,7 +335,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -335,7 +335,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead * pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
......
...@@ -625,7 +625,7 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -625,7 +625,7 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
colDataAppend(pColInfo, numOfRows, b1, false); colDataAppend(pColInfo, numOfRows, b1, false);
const char *roles = syncStr(pObj->role); const char *roles = syncStr(pObj->role);
char *b2 = taosMemoryCalloc(1, strlen(roles) + VARSTR_HEADER_SIZE); char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
...@@ -679,8 +679,8 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre ...@@ -679,8 +679,8 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len); memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len);
} }
stbObj.pColumns = taosMemoryMalloc(stbObj.numOfColumns * sizeof(SSchema)); stbObj.pColumns = taosMemoryCalloc(1, stbObj.numOfColumns * sizeof(SSchema));
stbObj.pTags = taosMemoryMalloc(stbObj.numOfTags * sizeof(SSchema)); stbObj.pTags = taosMemoryCalloc(1, stbObj.numOfTags * sizeof(SSchema));
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) { if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -1111,7 +1111,7 @@ static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD ...@@ -1111,7 +1111,7 @@ static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0; return 0;
} }
......
...@@ -31,8 +31,6 @@ extern "C" { ...@@ -31,8 +31,6 @@ extern "C" {
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
#define SDB_MAX_SIZE (32 * 1024)
typedef struct SSdbRaw { typedef struct SSdbRaw {
int8_t type; int8_t type;
int8_t status; int8_t status;
......
...@@ -28,12 +28,12 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -28,12 +28,12 @@ SSdb *sdbInit(SSdbOpt *pOption) {
return NULL; return NULL;
} }
char path[PATH_MAX + 100]; char path[PATH_MAX + 100] = {0};
snprintf(path, PATH_MAX + 100, "%s%sdata", pOption->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
pSdb->currDir = strdup(path); pSdb->currDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP);
pSdb->syncDir = strdup(path); pSdb->syncDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%stmp", pOption->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
pSdb->tmpDir = strdup(path); pSdb->tmpDir = strdup(path);
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) { if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
sdbCleanup(pSdb); sdbCleanup(pSdb);
...@@ -50,7 +50,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -50,7 +50,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
for (ESdbType i = 0; i < SDB_MAX; ++i) { for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]); taosInitRWLatch(&pSdb->locks[i]);
pSdb->maxId[i] = 0; pSdb->maxId[i] = 0;
pSdb->tableVer[i] = -1; pSdb->tableVer[i] = 0;
pSdb->keyTypes[i] = SDB_KEY_INT32; pSdb->keyTypes[i] = SDB_KEY_INT32;
} }
...@@ -99,7 +99,7 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -99,7 +99,7 @@ void sdbCleanup(SSdb *pSdb) {
taosHashClear(hash); taosHashClear(hash);
taosHashCleanup(hash); taosHashCleanup(hash);
pSdb->hashObjs[i] = NULL; pSdb->hashObjs[i] = NULL;
mDebug("sdb table:%d is cleaned up", i); mDebug("sdb table:%s is cleaned up", sdbTableName(i));
} }
taosMemoryFree(pSdb); taosMemoryFree(pSdb);
......
...@@ -50,7 +50,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -50,7 +50,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
} }
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1; int64_t maxId = 0;
ret = taosReadFile(pFile, &maxId, sizeof(int64_t)); ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -66,7 +66,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -66,7 +66,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
} }
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1; int64_t ver = 0;
ret = taosReadFile(pFile, &ver, sizeof(int64_t)); ret = taosReadFile(pFile, &ver, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -102,7 +102,7 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -102,7 +102,7 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
} }
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1; int64_t maxId = 0;
if (i < SDB_MAX) { if (i < SDB_MAX) {
maxId = pSdb->maxId[i]; maxId = pSdb->maxId[i];
} }
...@@ -113,7 +113,7 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -113,7 +113,7 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
} }
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1; int64_t ver = 0;
if (i < SDB_MAX) { if (i < SDB_MAX) {
ver = pSdb->tableVer[i]; ver = pSdb->tableVer[i];
} }
...@@ -165,6 +165,9 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -165,6 +165,9 @@ int32_t sdbReadFile(SSdb *pSdb) {
return -1; return -1;
} }
int64_t tableVer[SDB_MAX] = {0};
memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));
while (1) { while (1) {
readLen = sizeof(SSdbRaw); readLen = sizeof(SSdbRaw);
ret = taosReadFile(pFile, pRaw, readLen); ret = taosReadFile(pFile, pRaw, readLen);
...@@ -206,15 +209,16 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -206,15 +209,16 @@ int32_t sdbReadFile(SSdb *pSdb) {
code = sdbWriteWithoutFree(pSdb, pRaw); code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read file:%s since %s", file, terrstr());
goto PARSE_SDB_DATA_ERROR; goto _OVER;
} }
} }
code = 0; code = 0;
pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitVer = pSdb->curVer;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
PARSE_SDB_DATA_ERROR: _OVER:
taosCloseFile(&pFile); taosCloseFile(&pFile);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -259,7 +263,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -259,7 +263,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
if (pRow == NULL || pRow->status != SDB_STATUS_READY) { if (pRow == NULL) {
ppRow = taosHashIterate(hash, ppRow);
continue;
}
if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
sdbPrintOper(pSdb, pRow, "not-write");
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
continue; continue;
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow); static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow);
const char *sdbTableName(ESdbType type) { const char *sdbTableName(ESdbType type) {
switch (type) { switch (type) {
...@@ -65,12 +65,10 @@ const char *sdbTableName(ESdbType type) { ...@@ -65,12 +65,10 @@ const char *sdbTableName(ESdbType type) {
} }
} }
static const char *sdbStatusStr(ESdbStatus status) { static const char *sdbStatusName(ESdbStatus status) {
switch (status) { switch (status) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
return "creating"; return "creating";
case SDB_STATUS_UPDATING:
return "updating";
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPING:
return "dropping"; return "dropping";
case SDB_STATUS_READY: case SDB_STATUS_READY:
...@@ -89,13 +87,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { ...@@ -89,13 +87,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
if (keyType == SDB_KEY_BINARY) { if (keyType == SDB_KEY_BINARY) {
mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper, mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
pRow->pObj, sdbStatusStr(pRow->status)); pRow->pObj, sdbStatusName(pRow->status));
} else if (keyType == SDB_KEY_INT32) { } else if (keyType == SDB_KEY_INT32) {
mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount,
oper, pRow->pObj, sdbStatusStr(pRow->status)); oper, pRow->pObj, sdbStatusName(pRow->status));
} else if (keyType == SDB_KEY_INT64) { } else if (keyType == SDB_KEY_INT64) {
mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); pRow->refCount, oper, pRow->pObj, sdbStatusName(pRow->status));
} else { } else {
} }
} }
...@@ -116,7 +114,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { ...@@ -116,7 +114,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
} }
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
int32_t keySize; int32_t keySize = 0;
EKeyType keyType = pSdb->keyTypes[type]; EKeyType keyType = pSdb->keyTypes[type];
if (keyType == SDB_KEY_INT32) { if (keyType == SDB_KEY_INT32) {
...@@ -149,7 +147,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -149,7 +147,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
...@@ -183,18 +181,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -183,18 +181,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pNewRow->type]; SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
...@@ -230,8 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -230,8 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
sdbCheck(pSdb, pOldRow); sdbCheckRow(pSdb, pOldRow);
// sdbRelease(pSdb, pOldRow->pObj);
return 0; return 0;
} }
...@@ -254,7 +251,6 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -254,7 +251,6 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_UPDATING:
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
...@@ -295,7 +291,6 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -295,7 +291,6 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
switch (pRow->status) { switch (pRow->status) {
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_UPDATING:
atomic_add_fetch_32(&pRow->refCount, 1); atomic_add_fetch_32(&pRow->refCount, 1);
pRet = pRow->pObj; pRet = pRow->pObj;
sdbPrintOper(pSdb, pRow, "acquire"); sdbPrintOper(pSdb, pRow, "acquire");
...@@ -315,9 +310,9 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -315,9 +310,9 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
return pRet; return pRet;
} }
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
int32_t ref = atomic_load_32(&pRow->refCount); int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
...@@ -325,7 +320,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { ...@@ -325,7 +320,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
} }
void sdbRelease(SSdb *pSdb, void *pObj) { void sdbRelease(SSdb *pSdb, void *pObj) {
...@@ -335,7 +330,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -335,7 +330,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pRow->type >= SDB_MAX) return; if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosWLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release"); sdbPrintOper(pSdb, pRow, "release");
...@@ -343,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -343,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosWUnLockLatch(pLock);
} }
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
...@@ -355,16 +350,6 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { ...@@ -355,16 +350,6 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SRWLatch *pLock = &pSdb->locks[type]; SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
#if 0
if (pIter != NULL) {
SSdbRow *pLastRow = *(SSdbRow **)pIter;
int32_t ref = atomic_load_32(&pLastRow->refCount);
if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pLastRow);
}
}
#endif
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
......
...@@ -134,6 +134,11 @@ int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) { ...@@ -134,6 +134,11 @@ int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) {
return -1; return -1;
} }
if (status == SDB_STATUS_INIT) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
pRaw->status = status; pRaw->status = status;
return 0; return 0;
} }
......
...@@ -190,7 +190,7 @@ int vnodeSyncCommit(SVnode *pVnode) { ...@@ -190,7 +190,7 @@ int vnodeSyncCommit(SVnode *pVnode) {
} }
int vnodeCommit(SVnode *pVnode) { int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
vInfo("vgId:%d start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied); vInfo("vgId:%d start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied);
......
...@@ -207,7 +207,7 @@ static bool addHandleToAcceptloop(void* arg); ...@@ -207,7 +207,7 @@ static bool addHandleToAcceptloop(void* arg);
SExHandle* exh2 = uvAcquireExHandle(refId); \ SExHandle* exh2 = uvAcquireExHandle(refId); \
if (exh2 == NULL || refId != exh2->refId) { \ if (exh2 == NULL || refId != exh2->refId) { \
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh1->refId, refId); \ exh2 ? exh2->refId : 0, refId); \
goto _return1; \ goto _return1; \
} \ } \
} else if (refId == 0) { \ } else if (refId == 0) { \
......
...@@ -93,6 +93,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not support ...@@ -93,6 +93,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not support
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization") TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
......
...@@ -310,6 +310,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) { ...@@ -310,6 +310,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
if (pHashObj == NULL || key == NULL || keyLen == 0) { if (pHashObj == NULL || key == NULL || keyLen == 0) {
terrno = TSDB_CODE_INVALID_PTR;
return -1; return -1;
} }
...@@ -378,6 +379,8 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo ...@@ -378,6 +379,8 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
} }
doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode);
} else {
terrno = TSDB_CODE_DUP_KEY;
} }
taosHashEntryWUnlock(pHashObj, pe); taosHashEntryWUnlock(pHashObj, pe);
......
...@@ -175,7 +175,6 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char ...@@ -175,7 +175,6 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
if (handle != 0 && ftype == PROC_FUNC_REQ) { if (handle != 0 && ftype == PROC_FUNC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) { if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex); taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
} }
...@@ -227,8 +226,8 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char ...@@ -227,8 +226,8 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
taosThreadMutexUnlock(&pQueue->mutex); taosThreadMutexUnlock(&pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d handle:%p ref:%" PRId64 ", head:%d %p body:%d %p",
pQueue->items, headLen, pHead, bodyLen, pBody); pQueue->name, pos, ftype, pQueue->items, (void *)handle, handleRef, headLen, pHead, bodyLen, pBody);
return 0; return 0;
} }
...@@ -455,25 +454,28 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -455,25 +454,28 @@ void taosProcCleanup(SProcObj *pProc) {
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype) { void *handle, int64_t ref, EProcFuncType ftype) {
if (ftype != PROC_FUNC_REQ) { if (ftype != PROC_FUNC_REQ) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
} }
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, handleRef, return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
ftype);
} }
int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) { int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t h = (int64_t)handle; int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex); taosThreadMutexLock(&pProc->pChildQueue->mutex);
int64_t *handleRef = taosHashGet(pProc->hash, &h, sizeof(int64_t)); int64_t *pRef = taosHashGet(pProc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(pProc->hash, &h, sizeof(int64_t)); taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex); taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
if (handleRef == NULL) return 0; return ref;
return *handleRef;
} }
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
......
...@@ -66,10 +66,10 @@ ...@@ -66,10 +66,10 @@
# --- stable # --- stable
./test.sh -f tsim/stable/disk.sim ./test.sh -f tsim/stable/disk.sim
#./test.sh -f tsim/stable/dnode3.sim ./test.sh -f tsim/stable/dnode3.sim
./test.sh -f tsim/stable/metrics.sim ./test.sh -f tsim/stable/metrics.sim
./test.sh -f tsim/stable/refcount.sim ./test.sh -f tsim/stable/refcount.sim
# ./test.sh -f tsim/stable/show.sim #./test.sh -f tsim/stable/show.sim
./test.sh -f tsim/stable/values.sim ./test.sh -f tsim/stable/values.sim
./test.sh -f tsim/stable/vnode3.sim ./test.sh -f tsim/stable/vnode3.sim
...@@ -81,7 +81,7 @@ ...@@ -81,7 +81,7 @@
./test.sh -f tsim/insert/backquote.sim -m ./test.sh -f tsim/insert/backquote.sim -m
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m ./test.sh -f tsim/query/interval-offset.sim -m
#./test.sh -f tsim/tmq/basic3.sim -m ./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m ./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m ./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m
......
...@@ -3,9 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1 ...@@ -3,9 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
# after mnode support, del sleep 2000, and check dnode status
sleep 2000
sql connect sql connect
#$loop_cnt = 0 #$loop_cnt = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册