From ada94e72d259d23ebc97af2246296f9f9870aabe Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 22 Nov 2019 19:23:49 +0800 Subject: [PATCH] [TBASE-1128] --- src/inc/taosmsg.h | 2 +- src/sdb/src/sdbEngine.c | 65 ++++++++++++++++------------ src/system/detail/src/mgmtDnodeInt.c | 21 ++++++++- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e75404ad98..e59db1debf 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -583,7 +583,7 @@ typedef struct { int64_t compStorage; int64_t pointsWritten; uint8_t syncStatus; - uint8_t reserved; + uint8_t reserved[15]; } SVnodeLoad; typedef struct { diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 326e9a4df2..e0a61f91c5 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -24,7 +24,12 @@ extern char version[]; const int16_t sdbFileVersion = 0; int sdbExtConns = 0; + +#ifdef CLUSTER +int sdbMaster = 0; +#else int sdbMaster = 1; +#endif void *(*sdbInitIndexFp[])(int maxRows, int dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; @@ -98,7 +103,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) { pTable->header.swVersion = swVersion.iversion; pTable->header.sdbFileVersion = sdbFileVersion; if (taosCalcChecksumAppend(0, (uint8_t *)(&pTable->header), size) < 0) { - sdbError("failed to get file header checksum, file: %s", pTable->fn); + sdbError("failed to get file header checksum, file:%s", pTable->fn); tclose(pTable->fd); return -1; } @@ -120,17 +125,17 @@ int sdbOpenSdbFile(SSdbTable *pTable) { ssize_t tsize = read(pTable->fd, &(pTable->header), size); if (tsize < size) { - sdbError("failed to read sdb file header, file: %s", pTable->fn); + sdbError("failed to read sdb file header, file:%s", pTable->fn); tclose(pTable->fd); return -1; } if (pTable->header.swVersion != swVersion.iversion) { - sdbWarn("sdb file %s version not match software version", pTable->fn); + sdbWarn("sdb file:%s version not match software version", pTable->fn); } if (!taosCheckChecksumWhole((uint8_t *)(&pTable->header), size)) { - sdbError("sdb file header is broken since checksum mismatch, file: %s", pTable->fn); + sdbError("sdb file header is broken since checksum mismatch, file:%s", pTable->fn); tclose(pTable->fd); return -1; } @@ -173,17 +178,19 @@ int sdbInitTableByFile(SSdbTable *pTable) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb: %s", pTable->name); + sdbError("failed to allocate row head memory, sdb:%s", pTable->name); return -1; } + sdbTrace("open sdb file:%s for read", pTable->fn); + // Loop to read sdb file row by row while (1) { memset(rowHead, 0, total_size); bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); if (bytes < 0) { - sdbError("failed to read sdb file: %s", pTable->fn); + sdbError("failed to read sdb file:%s", pTable->fn); goto sdb_exit1; } @@ -196,8 +203,8 @@ int sdbInitTableByFile(SSdbTable *pTable) { } if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { - sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize, - pTable->maxRowSize); + sdbError("error row size in sdb file:%s, id:%d rowSize:%d maxRowSize:%d", + pTable->fn, rowHead->id, rowHead->rowSize, pTable->maxRowSize); pTable->size += sizeof(SRowHead); continue; } @@ -208,13 +215,13 @@ int sdbInitTableByFile(SSdbTable *pTable) { bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { // TODO: Here may cause pTable->size not end of the file - sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize); + sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize); break; } real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { - sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id); + sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id); pTable->size += real_size; continue; } @@ -225,7 +232,7 @@ int sdbInitTableByFile(SSdbTable *pTable) { if (pMetaRow == NULL) { // New object if (rowHead->id < 0) { /* assert(0); */ - sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name); + sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name); } else { rowMeta.id = rowHead->id; // TODO: Get rid of the rowMeta.offset and rowSize @@ -366,7 +373,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { pTable->id++; sdbVersion++; if (pTable->keyType == SDB_KEYTYPE_AUTO) { - // TODO: here need to change + // TODO:here need to change *((uint32_t *)pObj) = ++pTable->autoIndex; (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); } @@ -376,7 +383,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { rowHead->delimiter = SDB_DELIMITER; rowHead->id = pTable->id; if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum while inserting, sdb: %s", pTable->name); + sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); return -1; @@ -470,7 +477,7 @@ int sdbDeleteRow(void *handle, void *row) { total_size = sizeof(SRowHead) + rowSize + sizeof(TSCKSUM); rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb: %s", pTable->name); + sdbError("failed to allocate row head memory, sdb:%s", pTable->name); return -1; } memset(rowHead, 0, total_size); @@ -486,7 +493,7 @@ int sdbDeleteRow(void *handle, void *row) { rowHead->id = -(pTable->id); memcpy(rowHead->data, row, rowSize); if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { - sdbError("failed to get checksum while inserting, sdb: %s", pTable->name); + sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); return -1; @@ -498,7 +505,7 @@ int sdbDeleteRow(void *handle, void *row) { sdbFinishCommit(pTable); pTable->numOfRows--; - // TODO: Change the update list here + // TODO:Change the update list here sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow); switch (pTable->keyType) { case SDB_KEYTYPE_STRING: @@ -556,7 +563,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb: %s", pTable->name); + sdbError("failed to allocate row head memory, sdb:%s", pTable->name); return -1; } memset(rowHead, 0, total_size); @@ -585,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { rowHead->delimiter = SDB_DELIMITER; rowHead->id = pTable->id; if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum, sdb: %s id: %d", pTable->name, rowHead->id); + sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id); pthread_mutex_unlock(&pTable->mutex); tfree(rowHead); return -1; @@ -640,7 +647,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { if (pTable == NULL || row == NULL || rowSize <= 0) return -1; pMeta = sdbGetRowMeta(handle, row); if (pMeta == NULL) { - sdbTrace("table: %s, record is not there, batch update failed", pTable->name); + sdbTrace("table:%s, record is not there, batch update failed", pTable->name); return -1; } @@ -650,7 +657,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb: %s", pTable->name); + sdbError("failed to allocate row head memory, sdb:%s", pTable->name); return -1; } @@ -748,12 +755,14 @@ void sdbResetTable(SSdbTable *pTable) { return; } + sdbTrace("open sdb file:%s for update", pTable->fn); + while (1) { memset(rowHead, 0, total_size); bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); if (bytes < 0) { - sdbError("failed to read sdb file: %s", pTable->fn); + sdbError("failed to read sdb file:%s", pTable->fn); tfree(rowHead); return; } @@ -767,7 +776,7 @@ void sdbResetTable(SSdbTable *pTable) { } if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { - sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize, + sdbError("error row size in sdb file:%s rowSize:%d maxRowSize:%d", pTable->fn, rowHead->rowSize, pTable->maxRowSize); pTable->size += sizeof(SRowHead); continue; @@ -775,13 +784,13 @@ void sdbResetTable(SSdbTable *pTable) { bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { - sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize); + sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize); break; } real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { - sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id); + sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id); pTable->size += real_size; continue; } @@ -790,10 +799,10 @@ void sdbResetTable(SSdbTable *pTable) { pMetaRow = sdbGetRow(pTable, rowHead->data); if (pMetaRow == NULL) { // New object if (rowHead->id < 0) { - sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name); + sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name); } else { rowMeta.id = rowHead->id; - // TODO: Get rid of the rowMeta.offset and rowSize + // TODO:Get rid of the rowMeta.offset and rowSize rowMeta.offset = pTable->size; rowMeta.rowSize = rowHead->rowSize; rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL); @@ -826,7 +835,7 @@ void sdbResetTable(SSdbTable *pTable) { sdbTrace("table:%s is updated, sdbVerion:%ld id:%ld", pTable->name, sdbVersion, pTable->id); } -// TODO: A problem here : use snapshot file to sync another node will cause +// TODO:A problem here :use snapshot file to sync another node will cause // problem void sdbSaveSnapShot(void *handle) { SSdbTable *pTable = (SSdbTable *)handle; @@ -856,7 +865,7 @@ void sdbSaveSnapShot(void *handle) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate memory while saving SDB snapshot, sdb: %s", pTable->name); + sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->name); return; } memset(rowHead, 0, size); diff --git a/src/system/detail/src/mgmtDnodeInt.c b/src/system/detail/src/mgmtDnodeInt.c index 6390b7009f..a58f48ead1 100644 --- a/src/system/detail/src/mgmtDnodeInt.c +++ b/src/system/detail/src/mgmtDnodeInt.c @@ -43,6 +43,11 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SMeterCfgMsg *pCfg = (SMeterCfgMsg *)cont; SVgObj * pVgroup; + if (!sdbMaster) { + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT); + return 0; + } + int vnode = htonl(pCfg->vnode); int sid = htonl(pCfg->sid); @@ -51,6 +56,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); return 0; } + pMsg = pStart; if (vnode < pObj->numOfVnodes) { @@ -87,10 +93,18 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *)cont; SVgObj * pVgroup = NULL; + if (!sdbMaster) { + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT); + return 0; + } + int vnode = htonl(pCfg->vnode); pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP); - if (pStart == NULL) return 0; + if (pStart == NULL) { + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); + return 0; + } pMsg = pStart; if (vnode < pObj->numOfVnodes) pVgroup = mgmtGetVgroup(pObj->vload[vnode].vgId); @@ -121,6 +135,11 @@ int mgmtProcessFreeVnodeRsp(char *msg, int msgLen, SDnodeObj *pObj) { return 0; int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { STaosRsp *pRsp = (STaosRsp *)msg; + if (!sdbMaster) { + taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT); + return 0; + } + SDbObj *pDb = mgmtGetDb(pRsp->more); if (!pDb) { mError("dnode:%s, db not find, code:%d", taosIpStr(pObj->privateIp), pRsp->code); -- GitLab