提交 ada94e72 编写于 作者: S slguan

[TBASE-1128]

上级 026d0b49
......@@ -583,7 +583,7 @@ typedef struct {
int64_t compStorage;
int64_t pointsWritten;
uint8_t syncStatus;
uint8_t reserved;
uint8_t reserved[15];
} SVnodeLoad;
typedef struct {
......
......@@ -24,7 +24,12 @@
extern char version[];
const int16_t sdbFileVersion = 0;
int sdbExtConns = 0;
#ifdef CLUSTER
int sdbMaster = 0;
#else
int sdbMaster = 1;
#endif
void *(*sdbInitIndexFp[])(int maxRows, int dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash};
......@@ -98,7 +103,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
pTable->header.swVersion = swVersion.iversion;
pTable->header.sdbFileVersion = sdbFileVersion;
if (taosCalcChecksumAppend(0, (uint8_t *)(&pTable->header), size) < 0) {
sdbError("failed to get file header checksum, file: %s", pTable->fn);
sdbError("failed to get file header checksum, file:%s", pTable->fn);
tclose(pTable->fd);
return -1;
}
......@@ -120,17 +125,17 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
ssize_t tsize = read(pTable->fd, &(pTable->header), size);
if (tsize < size) {
sdbError("failed to read sdb file header, file: %s", pTable->fn);
sdbError("failed to read sdb file header, file:%s", pTable->fn);
tclose(pTable->fd);
return -1;
}
if (pTable->header.swVersion != swVersion.iversion) {
sdbWarn("sdb file %s version not match software version", pTable->fn);
sdbWarn("sdb file:%s version not match software version", pTable->fn);
}
if (!taosCheckChecksumWhole((uint8_t *)(&pTable->header), size)) {
sdbError("sdb file header is broken since checksum mismatch, file: %s", pTable->fn);
sdbError("sdb file header is broken since checksum mismatch, file:%s", pTable->fn);
tclose(pTable->fd);
return -1;
}
......@@ -173,17 +178,19 @@ int sdbInitTableByFile(SSdbTable *pTable) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name);
sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1;
}
sdbTrace("open sdb file:%s for read", pTable->fn);
// Loop to read sdb file row by row
while (1) {
memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) {
sdbError("failed to read sdb file: %s", pTable->fn);
sdbError("failed to read sdb file:%s", pTable->fn);
goto sdb_exit1;
}
......@@ -196,8 +203,8 @@ int sdbInitTableByFile(SSdbTable *pTable) {
}
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize,
pTable->maxRowSize);
sdbError("error row size in sdb file:%s, id:%d rowSize:%d maxRowSize:%d",
pTable->fn, rowHead->id, rowHead->rowSize, pTable->maxRowSize);
pTable->size += sizeof(SRowHead);
continue;
}
......@@ -208,13 +215,13 @@ int sdbInitTableByFile(SSdbTable *pTable) {
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
// TODO: Here may cause pTable->size not end of the file
sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize);
sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize);
break;
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id);
sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id);
pTable->size += real_size;
continue;
}
......@@ -225,7 +232,7 @@ int sdbInitTableByFile(SSdbTable *pTable) {
if (pMetaRow == NULL) { // New object
if (rowHead->id < 0) {
/* assert(0); */
sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name);
sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name);
} else {
rowMeta.id = rowHead->id;
// TODO: Get rid of the rowMeta.offset and rowSize
......@@ -366,7 +373,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
pTable->id++;
sdbVersion++;
if (pTable->keyType == SDB_KEYTYPE_AUTO) {
// TODO: here need to change
// TODO:here need to change
*((uint32_t *)pObj) = ++pTable->autoIndex;
(*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize));
}
......@@ -376,7 +383,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum while inserting, sdb: %s", pTable->name);
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
......@@ -470,7 +477,7 @@ int sdbDeleteRow(void *handle, void *row) {
total_size = sizeof(SRowHead) + rowSize + sizeof(TSCKSUM);
rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name);
sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1;
}
memset(rowHead, 0, total_size);
......@@ -486,7 +493,7 @@ int sdbDeleteRow(void *handle, void *row) {
rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb: %s", pTable->name);
sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
......@@ -498,7 +505,7 @@ int sdbDeleteRow(void *handle, void *row) {
sdbFinishCommit(pTable);
pTable->numOfRows--;
// TODO: Change the update list here
// TODO:Change the update list here
sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow);
switch (pTable->keyType) {
case SDB_KEYTYPE_STRING:
......@@ -556,7 +563,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name);
sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1;
}
memset(rowHead, 0, total_size);
......@@ -585,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb: %s id: %d", pTable->name, rowHead->id);
sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead);
return -1;
......@@ -640,7 +647,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
if (pTable == NULL || row == NULL || rowSize <= 0) return -1;
pMeta = sdbGetRowMeta(handle, row);
if (pMeta == NULL) {
sdbTrace("table: %s, record is not there, batch update failed", pTable->name);
sdbTrace("table:%s, record is not there, batch update failed", pTable->name);
return -1;
}
......@@ -650,7 +657,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name);
sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1;
}
......@@ -748,12 +755,14 @@ void sdbResetTable(SSdbTable *pTable) {
return;
}
sdbTrace("open sdb file:%s for update", pTable->fn);
while (1) {
memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) {
sdbError("failed to read sdb file: %s", pTable->fn);
sdbError("failed to read sdb file:%s", pTable->fn);
tfree(rowHead);
return;
}
......@@ -767,7 +776,7 @@ void sdbResetTable(SSdbTable *pTable) {
}
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize,
sdbError("error row size in sdb file:%s rowSize:%d maxRowSize:%d", pTable->fn, rowHead->rowSize,
pTable->maxRowSize);
pTable->size += sizeof(SRowHead);
continue;
......@@ -775,13 +784,13 @@ void sdbResetTable(SSdbTable *pTable) {
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize);
sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize);
break;
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id);
sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id);
pTable->size += real_size;
continue;
}
......@@ -790,10 +799,10 @@ void sdbResetTable(SSdbTable *pTable) {
pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) { // New object
if (rowHead->id < 0) {
sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name);
sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name);
} else {
rowMeta.id = rowHead->id;
// TODO: Get rid of the rowMeta.offset and rowSize
// TODO:Get rid of the rowMeta.offset and rowSize
rowMeta.offset = pTable->size;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL);
......@@ -826,7 +835,7 @@ void sdbResetTable(SSdbTable *pTable) {
sdbTrace("table:%s is updated, sdbVerion:%ld id:%ld", pTable->name, sdbVersion, pTable->id);
}
// TODO: A problem here : use snapshot file to sync another node will cause
// TODO:A problem here :use snapshot file to sync another node will cause
// problem
void sdbSaveSnapShot(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle;
......@@ -856,7 +865,7 @@ void sdbSaveSnapShot(void *handle) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
sdbError("failed to allocate memory while saving SDB snapshot, sdb: %s", pTable->name);
sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->name);
return;
}
memset(rowHead, 0, size);
......
......@@ -43,6 +43,11 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SMeterCfgMsg *pCfg = (SMeterCfgMsg *)cont;
SVgObj * pVgroup;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode);
int sid = htonl(pCfg->sid);
......@@ -51,6 +56,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart;
if (vnode < pObj->numOfVnodes) {
......@@ -87,10 +93,18 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *)cont;
SVgObj * pVgroup = NULL;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode);
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP);
if (pStart == NULL) return 0;
if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart;
if (vnode < pObj->numOfVnodes) pVgroup = mgmtGetVgroup(pObj->vload[vnode].vgId);
......@@ -121,6 +135,11 @@ int mgmtProcessFreeVnodeRsp(char *msg, int msgLen, SDnodeObj *pObj) { return 0;
int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0;
}
SDbObj *pDb = mgmtGetDb(pRsp->more);
if (!pDb) {
mError("dnode:%s, db not find, code:%d", taosIpStr(pObj->privateIp), pRsp->code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册