未验证 提交 553df7ff 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #762 from taosdata/feature/slguan

Added some comments and solved some cluster bugs
...@@ -64,12 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -64,12 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
#ifdef CLUSTER #ifdef CLUSTER
if (ip && ip[0]) { if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipstr[0], ip); strcpy(tscMgmtIpList.ipstr[0], ip);
tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[0] = inet_addr(ip);
strcpy(tscMgmtIpList.ipstr[1], ip);
tscMgmtIpList.ip[1] = inet_addr(ip);
} }
#else #else
if (ip && ip[0]) { if (ip && ip[0]) {
...@@ -157,7 +153,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -157,7 +153,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) { if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
#ifdef CLUSTER #ifdef CLUSTER
ip = tsPrivateIp; ip = tsMasterIp;
#else #else
ip = tsServerIpStr; ip = tsServerIpStr;
#endif #endif
......
...@@ -583,7 +583,7 @@ typedef struct { ...@@ -583,7 +583,7 @@ typedef struct {
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
uint8_t syncStatus; uint8_t syncStatus;
uint8_t reserved; uint8_t reserved[15];
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
...@@ -594,7 +594,12 @@ typedef struct { ...@@ -594,7 +594,12 @@ typedef struct {
// NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN/4 // NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN/4
typedef struct { typedef struct {
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
char db[TSDB_METER_ID_LEN+2]; // 8bytes align /*
* the message is too large, so it may will overwrite the cfg information in meterobj.v*
* recover to origin codes
*/
//char db[TSDB_METER_ID_LEN+2]; // 8bytes align
char db[TSDB_DB_NAME_LEN];
uint32_t vgId; uint32_t vgId;
int32_t maxSessions; int32_t maxSessions;
int32_t cacheBlockSize; int32_t cacheBlockSize;
......
...@@ -44,40 +44,6 @@ extern "C" { ...@@ -44,40 +44,6 @@ extern "C" {
#define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_MICRO_STR "us"
enum _vnode_status {
TSDB_VNODE_STATUS_OFFLINE,
TSDB_VNODE_STATUS_CREATING,
TSDB_VNODE_STATUS_UNSYNCED,
TSDB_VNODE_STATUS_SLAVE,
TSDB_VNODE_STATUS_MASTER,
TSDB_VNODE_STATUS_CLOSING,
TSDB_VNODE_STATUS_DELETING,
};
enum _vnode_sync_status {
STDB_SSTATUS_INIT,
TSDB_SSTATUS_SYNCING,
TSDB_SSTATUS_SYNC_CACHE,
TSDB_SSTATUS_SYNC_FILE,
};
enum _dnode_status {
TSDB_DNODE_STATUS_OFFLINE,
TSDB_DNODE_STATUS_READY
};
enum _dnode_balance_status {
LB_DNODE_STATE_BALANCED,
LB_DNODE_STATE_BALANCING,
LB_DNODE_STATE_OFFLINE_REMOVING,
LB_DNODE_STATE_SHELL_REMOVING
};
enum _vgroup_status {
LB_VGROUP_STATE_READY,
LB_VGROUP_STATE_UPDATE
};
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes #define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte #define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes #define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
......
...@@ -20,11 +20,73 @@ ...@@ -20,11 +20,73 @@
extern "C" { extern "C" {
#endif #endif
enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY,
TSDB_VG_STATUS_IN_PROGRESS,
TSDB_VG_STATUS_COMMITLOG_INIT_FAILED,
TSDB_VG_STATUS_INIT_FAILED,
TSDB_VG_STATUS_FULL
};
enum _TSDB_DB_STATUS {
TSDB_DB_STATUS_READY,
TSDB_DB_STATUS_DROPPING,
TSDB_DB_STATUS_DROP_FROM_SDB
};
enum _TSDB_VN_STATUS {
TSDB_VN_STATUS_OFFLINE,
TSDB_VN_STATUS_CREATING,
TSDB_VN_STATUS_UNSYNCED,
TSDB_VN_STATUS_SLAVE,
TSDB_VN_STATUS_MASTER,
TSDB_VN_STATUS_CLOSING,
TSDB_VN_STATUS_DELETING,
};
enum _TSDB_VN_SYNC_STATUS {
TSDB_VN_SYNC_STATUS_INIT,
TSDB_VN_SYNC_STATUS_SYNCING,
TSDB_VN_SYNC_STATUS_SYNC_CACHE,
TSDB_VN_SYNC_STATUS_SYNC_FILE
};
enum _TSDB_VN_DROP_STATUS {
TSDB_VN_DROP_STATUS_READY,
TSDB_VN_DROP_STATUS_DROPPING
};
enum _TSDB_DN_STATUS {
TSDB_DN_STATUS_OFFLINE,
TSDB_DN_STATUS_READY
};
enum _TSDB_DN_LB_STATUS {
TSDB_DN_LB_STATUS_BALANCED,
TSDB_DN_LB_STATUS_BALANCING,
TSDB_DN_LB_STATUS_OFFLINE_REMOVING,
TSDB_DN_LB_STATE_SHELL_REMOVING
};
enum _TSDB_VG_LB_STATUS {
TSDB_VG_LB_STATUS_READY,
TSDB_VG_LB_STATUS_UPDATE
};
enum _TSDB_VN_STREAM_STATUS {
TSDB_VN_STREAM_STATUS_STOP,
TSDB_VN_STREAM_STATUS_START
};
const char* taosGetVgroupStatusStr(int vgroupStatus);
const char* taosGetDbStatusStr(int dbStatus);
const char* taosGetVnodeStatusStr(int vnodeStatus); const char* taosGetVnodeStatusStr(int vnodeStatus);
const char* taosGetDnodeStatusStr(int dnodeStatus);
const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus);
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus); const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
const char* taosGetVnodeDropStatusStr(int dropping); const char* taosGetVnodeDropStatusStr(int dropping);
const char* taosGetDnodeStatusStr(int dnodeStatus);
const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus);
const char* taosGetVgroupLbStatusStr(int vglbStatus);
const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -63,7 +63,7 @@ ...@@ -63,7 +63,7 @@
#define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000 #define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 1000 #define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0 #define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2 #define HTTP_COMPRESS_GZIP 2
......
...@@ -107,9 +107,9 @@ void httpCleanUpContextTimer(HttpContext *pContext) { ...@@ -107,9 +107,9 @@ void httpCleanUpContextTimer(HttpContext *pContext) {
} }
void httpCleanUpContext(HttpContext *pContext) { void httpCleanUpContext(HttpContext *pContext) {
httpTrace("context:%p, start the clean up operation", pContext); httpTrace("context:%p, start the clean up operation, sig:%p", pContext, pContext->signature);
atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0); void *sig = atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0);
if (pContext->signature != NULL) { if (sig == NULL) {
httpTrace("context:%p is freed by another thread.", pContext); httpTrace("context:%p is freed by another thread.", pContext);
return; return;
} }
...@@ -527,8 +527,8 @@ void httpAcceptHttpConnection(void *arg) { ...@@ -527,8 +527,8 @@ void httpAcceptHttpConnection(void *arg) {
totalFds += pServer->pThreads[i].numOfFds; totalFds += pServer->pThreads[i].numOfFds;
} }
if (totalFds > tsHttpCacheSessions * 20) { if (totalFds > tsHttpCacheSessions * 100) {
httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*20, refuse connection", httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection",
connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions); connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions);
taosCloseSocket(connFd); taosCloseSocket(connFd);
continue; continue;
......
...@@ -89,7 +89,7 @@ int httpStartSystem() { ...@@ -89,7 +89,7 @@ int httpStartSystem() {
} }
if (httpServer->timerHandle == NULL) { if (httpServer->timerHandle == NULL) {
httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 20 + 100, 1000, 60000, "http"); httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http");
} }
if (httpServer->timerHandle == NULL) { if (httpServer->timerHandle == NULL) {
httpError("http init timer failed"); httpError("http init timer failed");
......
...@@ -246,9 +246,9 @@ void monitorStopSystem() { ...@@ -246,9 +246,9 @@ void monitorStopSystem() {
} }
#ifdef CLUSTER #ifdef CLUSTER
monitorLPrint("dnode:%s is stopped", tsPrivateIp); monitorLPrint("dnode:%s monitor module is stopped", tsPrivateIp);
#else #else
monitorLPrint("dnode:%s is stopped", tsInternalIp); monitorLPrint("dnode:%s monitor module is stopped", tsInternalIp);
#endif #endif
monitor->state = MONITOR_STATE_STOPPED; monitor->state = MONITOR_STATE_STOPPED;
taosLogFp = NULL; taosLogFp = NULL;
......
...@@ -287,8 +287,10 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { ...@@ -287,8 +287,10 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
ssize_t sentbytes; ssize_t sentbytes;
while (leftbytes > 0) { while (leftbytes > 0) {
// TODO : Think to check if file is larger than 1GB /*
if (leftbytes > 1000000000) leftbytes = 1000000000; * TODO : Think to check if file is larger than 1GB
*/
//if (leftbytes > 1000000000) leftbytes = 1000000000;
sentbytes = sendfile(dfd, sfd, offset, leftbytes); sentbytes = sendfile(dfd, sfd, offset, leftbytes);
if (sentbytes == -1) { if (sentbytes == -1) {
if (errno == EINTR) { if (errno == EINTR) {
......
...@@ -506,6 +506,8 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -506,6 +506,8 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pConn->localPort = (int16_t)ntohs(sin.sin_port); pConn->localPort = (int16_t)ntohs(sin.sin_port);
} }
strcpy(pConn->label, label);
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd); taosCloseSocket(pConn->fd);
...@@ -513,7 +515,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -513,7 +515,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
return NULL; return NULL;
} }
strcpy(pConn->label, label);
pConn->shandle = shandle; pConn->shandle = shandle;
pConn->processData = fp; pConn->processData = fp;
pConn->index = i; pConn->index = i;
......
...@@ -24,7 +24,12 @@ ...@@ -24,7 +24,12 @@
extern char version[]; extern char version[];
const int16_t sdbFileVersion = 0; const int16_t sdbFileVersion = 0;
int sdbExtConns = 0; int sdbExtConns = 0;
#ifdef CLUSTER
int sdbMaster = 0;
#else
int sdbMaster = 1; int sdbMaster = 1;
#endif
void *(*sdbInitIndexFp[])(int maxRows, int dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash}; void *(*sdbInitIndexFp[])(int maxRows, int dataSize) = {sdbOpenStrHash, sdbOpenIntHash, sdbOpenIntHash};
...@@ -98,7 +103,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) { ...@@ -98,7 +103,7 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
pTable->header.swVersion = swVersion.iversion; pTable->header.swVersion = swVersion.iversion;
pTable->header.sdbFileVersion = sdbFileVersion; pTable->header.sdbFileVersion = sdbFileVersion;
if (taosCalcChecksumAppend(0, (uint8_t *)(&pTable->header), size) < 0) { if (taosCalcChecksumAppend(0, (uint8_t *)(&pTable->header), size) < 0) {
sdbError("failed to get file header checksum, file: %s", pTable->fn); sdbError("failed to get file header checksum, file:%s", pTable->fn);
tclose(pTable->fd); tclose(pTable->fd);
return -1; return -1;
} }
...@@ -120,17 +125,17 @@ int sdbOpenSdbFile(SSdbTable *pTable) { ...@@ -120,17 +125,17 @@ int sdbOpenSdbFile(SSdbTable *pTable) {
ssize_t tsize = read(pTable->fd, &(pTable->header), size); ssize_t tsize = read(pTable->fd, &(pTable->header), size);
if (tsize < size) { if (tsize < size) {
sdbError("failed to read sdb file header, file: %s", pTable->fn); sdbError("failed to read sdb file header, file:%s", pTable->fn);
tclose(pTable->fd); tclose(pTable->fd);
return -1; return -1;
} }
if (pTable->header.swVersion != swVersion.iversion) { if (pTable->header.swVersion != swVersion.iversion) {
sdbWarn("sdb file %s version not match software version", pTable->fn); sdbWarn("sdb file:%s version not match software version", pTable->fn);
} }
if (!taosCheckChecksumWhole((uint8_t *)(&pTable->header), size)) { if (!taosCheckChecksumWhole((uint8_t *)(&pTable->header), size)) {
sdbError("sdb file header is broken since checksum mismatch, file: %s", pTable->fn); sdbError("sdb file header is broken since checksum mismatch, file:%s", pTable->fn);
tclose(pTable->fd); tclose(pTable->fd);
return -1; return -1;
} }
...@@ -173,17 +178,19 @@ int sdbInitTableByFile(SSdbTable *pTable) { ...@@ -173,17 +178,19 @@ int sdbInitTableByFile(SSdbTable *pTable) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name); sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1; return -1;
} }
sdbTrace("open sdb file:%s for read", pTable->fn);
// Loop to read sdb file row by row // Loop to read sdb file row by row
while (1) { while (1) {
memset(rowHead, 0, total_size); memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) { if (bytes < 0) {
sdbError("failed to read sdb file: %s", pTable->fn); sdbError("failed to read sdb file:%s", pTable->fn);
goto sdb_exit1; goto sdb_exit1;
} }
...@@ -196,8 +203,8 @@ int sdbInitTableByFile(SSdbTable *pTable) { ...@@ -196,8 +203,8 @@ int sdbInitTableByFile(SSdbTable *pTable) {
} }
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize, sdbError("error row size in sdb file:%s, id:%d rowSize:%d maxRowSize:%d",
pTable->maxRowSize); pTable->fn, rowHead->id, rowHead->rowSize, pTable->maxRowSize);
pTable->size += sizeof(SRowHead); pTable->size += sizeof(SRowHead);
continue; continue;
} }
...@@ -208,13 +215,13 @@ int sdbInitTableByFile(SSdbTable *pTable) { ...@@ -208,13 +215,13 @@ int sdbInitTableByFile(SSdbTable *pTable) {
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
// TODO: Here may cause pTable->size not end of the file // TODO: Here may cause pTable->size not end of the file
sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize); sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize);
break; break;
} }
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id); sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id);
pTable->size += real_size; pTable->size += real_size;
continue; continue;
} }
...@@ -225,7 +232,7 @@ int sdbInitTableByFile(SSdbTable *pTable) { ...@@ -225,7 +232,7 @@ int sdbInitTableByFile(SSdbTable *pTable) {
if (pMetaRow == NULL) { // New object if (pMetaRow == NULL) { // New object
if (rowHead->id < 0) { if (rowHead->id < 0) {
/* assert(0); */ /* assert(0); */
sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name); sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name);
} else { } else {
rowMeta.id = rowHead->id; rowMeta.id = rowHead->id;
// TODO: Get rid of the rowMeta.offset and rowSize // TODO: Get rid of the rowMeta.offset and rowSize
...@@ -366,7 +373,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -366,7 +373,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
pTable->id++; pTable->id++;
sdbVersion++; sdbVersion++;
if (pTable->keyType == SDB_KEYTYPE_AUTO) { if (pTable->keyType == SDB_KEYTYPE_AUTO) {
// TODO: here need to change // TODO:here need to change
*((uint32_t *)pObj) = ++pTable->autoIndex; *((uint32_t *)pObj) = ++pTable->autoIndex;
(*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize)); (*(pTable->appTool))(SDB_TYPE_ENCODE, pObj, rowHead->data, pTable->maxRowSize, &(rowHead->rowSize));
} }
...@@ -376,7 +383,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { ...@@ -376,7 +383,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) {
rowHead->delimiter = SDB_DELIMITER; rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id; rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum while inserting, sdb: %s", pTable->name); sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead); tfree(rowHead);
return -1; return -1;
...@@ -470,7 +477,7 @@ int sdbDeleteRow(void *handle, void *row) { ...@@ -470,7 +477,7 @@ int sdbDeleteRow(void *handle, void *row) {
total_size = sizeof(SRowHead) + rowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + rowSize + sizeof(TSCKSUM);
rowHead = (SRowHead *)malloc(total_size); rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name); sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1; return -1;
} }
memset(rowHead, 0, total_size); memset(rowHead, 0, total_size);
...@@ -486,7 +493,7 @@ int sdbDeleteRow(void *handle, void *row) { ...@@ -486,7 +493,7 @@ int sdbDeleteRow(void *handle, void *row) {
rowHead->id = -(pTable->id); rowHead->id = -(pTable->id);
memcpy(rowHead->data, row, rowSize); memcpy(rowHead->data, row, rowSize);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) {
sdbError("failed to get checksum while inserting, sdb: %s", pTable->name); sdbError("failed to get checksum while inserting, sdb:%s", pTable->name);
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead); tfree(rowHead);
return -1; return -1;
...@@ -498,7 +505,7 @@ int sdbDeleteRow(void *handle, void *row) { ...@@ -498,7 +505,7 @@ int sdbDeleteRow(void *handle, void *row) {
sdbFinishCommit(pTable); sdbFinishCommit(pTable);
pTable->numOfRows--; pTable->numOfRows--;
// TODO: Change the update list here // TODO:Change the update list here
sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow); sdbAddIntoUpdateList(pTable, SDB_TYPE_DELETE, pMetaRow);
switch (pTable->keyType) { switch (pTable->keyType) {
case SDB_KEYTYPE_STRING: case SDB_KEYTYPE_STRING:
...@@ -556,7 +563,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -556,7 +563,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name); sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1; return -1;
} }
memset(rowHead, 0, total_size); memset(rowHead, 0, total_size);
...@@ -585,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { ...@@ -585,7 +592,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) {
rowHead->delimiter = SDB_DELIMITER; rowHead->delimiter = SDB_DELIMITER;
rowHead->id = pTable->id; rowHead->id = pTable->id;
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError("failed to get checksum, sdb: %s id: %d", pTable->name, rowHead->id); sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id);
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
tfree(rowHead); tfree(rowHead);
return -1; return -1;
...@@ -640,7 +647,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { ...@@ -640,7 +647,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
if (pTable == NULL || row == NULL || rowSize <= 0) return -1; if (pTable == NULL || row == NULL || rowSize <= 0) return -1;
pMeta = sdbGetRowMeta(handle, row); pMeta = sdbGetRowMeta(handle, row);
if (pMeta == NULL) { if (pMeta == NULL) {
sdbTrace("table: %s, record is not there, batch update failed", pTable->name); sdbTrace("table:%s, record is not there, batch update failed", pTable->name);
return -1; return -1;
} }
...@@ -650,7 +657,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { ...@@ -650,7 +657,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
sdbError("failed to allocate row head memory, sdb: %s", pTable->name); sdbError("failed to allocate row head memory, sdb:%s", pTable->name);
return -1; return -1;
} }
...@@ -748,12 +755,14 @@ void sdbResetTable(SSdbTable *pTable) { ...@@ -748,12 +755,14 @@ void sdbResetTable(SSdbTable *pTable) {
return; return;
} }
sdbTrace("open sdb file:%s for update", pTable->fn);
while (1) { while (1) {
memset(rowHead, 0, total_size); memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) { if (bytes < 0) {
sdbError("failed to read sdb file: %s", pTable->fn); sdbError("failed to read sdb file:%s", pTable->fn);
tfree(rowHead); tfree(rowHead);
return; return;
} }
...@@ -767,7 +776,7 @@ void sdbResetTable(SSdbTable *pTable) { ...@@ -767,7 +776,7 @@ void sdbResetTable(SSdbTable *pTable) {
} }
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError("error row size in sdb file: %s rowSize: %d maxRowSize: %d", pTable->fn, rowHead->rowSize, sdbError("error row size in sdb file:%s rowSize:%d maxRowSize:%d", pTable->fn, rowHead->rowSize,
pTable->maxRowSize); pTable->maxRowSize);
pTable->size += sizeof(SRowHead); pTable->size += sizeof(SRowHead);
continue; continue;
...@@ -775,13 +784,13 @@ void sdbResetTable(SSdbTable *pTable) { ...@@ -775,13 +784,13 @@ void sdbResetTable(SSdbTable *pTable) {
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
sdbError("failed to read sdb file: %s id: %d rowSize: %d", pTable->fn, rowHead->id, rowHead->rowSize); sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize);
break; break;
} }
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError("error sdb checksum, sdb: %s id: %d, skip", pTable->name, rowHead->id); sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id);
pTable->size += real_size; pTable->size += real_size;
continue; continue;
} }
...@@ -790,10 +799,10 @@ void sdbResetTable(SSdbTable *pTable) { ...@@ -790,10 +799,10 @@ void sdbResetTable(SSdbTable *pTable) {
pMetaRow = sdbGetRow(pTable, rowHead->data); pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) { // New object if (pMetaRow == NULL) { // New object
if (rowHead->id < 0) { if (rowHead->id < 0) {
sdbError("error sdb negative id: %d, sdb: %s, skip", rowHead->id, pTable->name); sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name);
} else { } else {
rowMeta.id = rowHead->id; rowMeta.id = rowHead->id;
// TODO: Get rid of the rowMeta.offset and rowSize // TODO:Get rid of the rowMeta.offset and rowSize
rowMeta.offset = pTable->size; rowMeta.offset = pTable->size;
rowMeta.rowSize = rowHead->rowSize; rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL); rowMeta.row = (*(pTable->appTool))(SDB_TYPE_DECODE, NULL, rowHead->data, rowHead->rowSize, NULL);
...@@ -826,7 +835,7 @@ void sdbResetTable(SSdbTable *pTable) { ...@@ -826,7 +835,7 @@ void sdbResetTable(SSdbTable *pTable) {
sdbTrace("table:%s is updated, sdbVerion:%ld id:%ld", pTable->name, sdbVersion, pTable->id); sdbTrace("table:%s is updated, sdbVerion:%ld id:%ld", pTable->name, sdbVersion, pTable->id);
} }
// TODO: A problem here : use snapshot file to sync another node will cause // TODO:A problem here :use snapshot file to sync another node will cause
// problem // problem
void sdbSaveSnapShot(void *handle) { void sdbSaveSnapShot(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
...@@ -856,7 +865,7 @@ void sdbSaveSnapShot(void *handle) { ...@@ -856,7 +865,7 @@ void sdbSaveSnapShot(void *handle) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size); SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
sdbError("failed to allocate memory while saving SDB snapshot, sdb: %s", pTable->name); sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->name);
return; return;
} }
memset(rowHead, 0, size); memset(rowHead, 0, size);
......
...@@ -45,17 +45,6 @@ extern int mgmtShellConns; ...@@ -45,17 +45,6 @@ extern int mgmtShellConns;
extern int mgmtDnodeConns; extern int mgmtDnodeConns;
extern char mgmtDirectory[]; extern char mgmtDirectory[];
enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY,
TSDB_VG_STATUS_IN_PROGRESS,
TSDB_VG_STATUS_COMMITLOG_INIT_FAILED,
TSDB_VG_STATUS_INIT_FAILED,
TSDB_VG_STATUS_FULL
};
enum _TSDB_DB_STATUS { TSDB_DB_STATUS_READY, TSDB_DB_STATUS_DROPPING, TSDB_DB_STATUS_DROP_FROM_SDB };
enum _TSDB_VN_STATUS { TSDB_VN_STATUS_READY, TSDB_VN_STATUS_DROPPING };
typedef struct { typedef struct {
uint32_t privateIp; uint32_t privateIp;
...@@ -86,7 +75,7 @@ typedef struct { ...@@ -86,7 +75,7 @@ typedef struct {
uint16_t slot; uint16_t slot;
int32_t customScore; // config by user int32_t customScore; // config by user
float lbScore; // calc in balance function float lbScore; // calc in balance function
int16_t lbState; // set in balance function int16_t lbStatus; // set in balance function
int16_t lastAllocVnode; // increase while create vnode int16_t lastAllocVnode; // increase while create vnode
SVnodeLoad vload[TSDB_MAX_VNODES]; SVnodeLoad vload[TSDB_MAX_VNODES];
char reserved[16]; char reserved[16];
...@@ -148,7 +137,7 @@ typedef struct _vg_obj { ...@@ -148,7 +137,7 @@ typedef struct _vg_obj {
int32_t numOfMeters; int32_t numOfMeters;
int32_t lbIp; int32_t lbIp;
int32_t lbTime; int32_t lbTime;
int8_t lbState; int8_t lbStatus;
char reserved[16]; char reserved[16];
char updateEnd[1]; char updateEnd[1];
struct _vg_obj *prev, *next; struct _vg_obj *prev, *next;
...@@ -426,7 +415,7 @@ void mgmtSetDnodeShellRemoving(SDnodeObj *pDnode); ...@@ -426,7 +415,7 @@ void mgmtSetDnodeShellRemoving(SDnodeObj *pDnode);
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
void mgmtStartBalanceTimer(int mseconds); void mgmtStartBalanceTimer(int mseconds);
void mgmtSetDnodeOfflineOnSdbChanged(); void mgmtSetDnodeOfflineOnSdbChanged();
void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbState, int srcIp); void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbStatus, int srcIp);
bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType); void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
......
...@@ -116,7 +116,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { ...@@ -116,7 +116,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
goto _over; goto _over;
} }
if (pVnode->syncStatus == TSDB_SSTATUS_SYNCING) { if (pVnode->syncStatus == TSDB_VN_SYNC_STATUS_SYNCING) {
code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen); code = vnodeSaveCreateMsgIntoQueue(pVnode, pMsg, msgLen);
dTrace("vid:%d, create msg is saved into sync queue", vid); dTrace("vid:%d, create msg is saved into sync queue", vid);
} else { } else {
...@@ -331,7 +331,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -331,7 +331,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return -1; return -1;
} }
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_CREATING) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_CREATING) {
dTrace("vid:%d, vnode is still under creating", vnode); dTrace("vid:%d, vnode is still under creating", vnode);
return 0; return 0;
} }
...@@ -350,51 +350,52 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -350,51 +350,52 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
if (pCfg->replications > 0) { if (pCfg->replications > 0) {
dTrace("vid:%d, vpeer cfg received, replica:%d session:%d, vnodeList replica:%d session:%d", dPrint("vid:%d, vpeer cfg received, replica:%d session:%d, vnodeList replica:%d session:%d",
vnode, pCfg->replications, pCfg->maxSessions, vnodeList[vnode].cfg.replications, vnodeList[vnode].cfg.maxSessions); vnode, pCfg->replications, pCfg->maxSessions, vnodeList[vnode].cfg.replications, vnodeList[vnode].cfg.maxSessions);
for (i = 0; i < pCfg->replications; ++i) { for (i = 0; i < pCfg->replications; ++i) {
pMsg->vpeerDesc[i].vnode = htonl(pMsg->vpeerDesc[i].vnode); pMsg->vpeerDesc[i].vnode = htonl(pMsg->vpeerDesc[i].vnode);
pMsg->vpeerDesc[i].ip = htonl(pMsg->vpeerDesc[i].ip); pMsg->vpeerDesc[i].ip = htonl(pMsg->vpeerDesc[i].ip);
dTrace("vid:%d, vpeer:%d ip:0x%x vid:%d ", vnode, i, pMsg->vpeerDesc[i].ip, pMsg->vpeerDesc[i].vnode); dPrint("vid:%d, vpeer:%d ip:0x%x vid:%d ", vnode, i, pMsg->vpeerDesc[i].ip, pMsg->vpeerDesc[i].vnode);
} }
} }
if (vnodeList[vnode].cfg.maxSessions == 0) { if (vnodeList[vnode].cfg.maxSessions == 0) {
dTrace("vid:%d, vnode is empty", vnode); dPrint("vid:%d, vnode is empty", vnode);
if (pCfg->maxSessions > 0) { if (pCfg->maxSessions > 0) {
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_OFFLINE) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_OFFLINE) {
dTrace("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dPrint("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc); return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc);
} else { } else {
dTrace("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dPrint("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS; return TSDB_CODE_INVALID_VNODE_STATUS;
} }
} }
} else { } else {
dTrace("vid:%d, vnode is not empty", vnode); dPrint("vid:%d, vnode is not empty", vnode);
if (pCfg->maxSessions > 0) { if (pCfg->maxSessions > 0) {
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_DELETING) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_DELETING) {
dTrace("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dPrint("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
} else { } else {
dTrace("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dPrint("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeCleanUpOneVnode(vnode);
}
vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc);
vnodeSaveVnodeCfg(vnode, pCfg, pMsg->vpeerDesc);
/*
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeUpdateHeadFile(vnode, vnodeList[vnode].cfg.maxSessions, pCfg->maxSessions);
vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions;
vnodeOpenVnode(vnode);
}
*/
} }
/*
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeCleanUpOneVnode(vnode);
}
vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc);
vnodeSaveVnodeCfg(vnode, pCfg, pMsg->vpeerDesc);
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
vnodeUpdateHeadFile(vnode, vnodeList[vnode].cfg.maxSessions, pCfg->maxSessions);
vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions;
vnodeOpenVnode(vnode);
}
*/
return 0; return 0;
} else { } else {
dTrace("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dPrint("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
vnodeRemoveVnode(vnode); vnodeRemoveVnode(vnode);
} }
} }
......
...@@ -136,6 +136,8 @@ int dnodeInitSystem() { ...@@ -136,6 +136,8 @@ int dnodeInitSystem() {
return -1; return -1;
} }
vnodeInitMgmtIp();
tsPrintGlobalConfig(); tsPrintGlobalConfig();
dPrint("Server IP address is:%s", tsInternalIp); dPrint("Server IP address is:%s", tsInternalIp);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtUtil.h" #include "mgmtUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tstatus.h"
void *dbSdb = NULL; void *dbSdb = NULL;
int tsDbUpdateSize; int tsDbUpdateSize;
...@@ -141,7 +142,7 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) { ...@@ -141,7 +142,7 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) {
if (pCreate->cacheNumOfBlocks.fraction < 0) pCreate->cacheNumOfBlocks.fraction = tsAverageCacheBlocks; // if (pCreate->cacheNumOfBlocks.fraction < 0) pCreate->cacheNumOfBlocks.fraction = tsAverageCacheBlocks; //
//-1 for balance //-1 for balance
if (pCreate->replications <= 0 || pCreate->replications > TSDB_REPLICA_MAX_NUM) { if (pCreate->replications < 0 || pCreate->replications > TSDB_REPLICA_MAX_NUM) {
mTrace("invalid db option replications: %d", pCreate->replications); mTrace("invalid db option replications: %d", pCreate->replications);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }
...@@ -281,8 +282,8 @@ int mgmtSetDbDropping(SDbObj *pDb) { ...@@ -281,8 +282,8 @@ int mgmtSetDbDropping(SDbObj *pDb) {
if (pDnode == NULL) continue; if (pDnode == NULL) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus != TSDB_VN_STATUS_DROPPING) { if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) {
pVload->dropStatus = TSDB_VN_STATUS_DROPPING; pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
if (mgmtUpdateDnode(pDnode) < 0) { if (mgmtUpdateDnode(pDnode) < 0) {
...@@ -315,10 +316,10 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) { ...@@ -315,10 +316,10 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) {
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
if (pDnode == NULL) continue; if (pDnode == NULL) continue;
if (pDnode->status == TSDB_DNODE_STATUS_OFFLINE) continue; if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue;
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
if (pVload->dropStatus == TSDB_VN_STATUS_DROPPING) { if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name);
return false; return false;
} }
...@@ -444,7 +445,7 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { ...@@ -444,7 +445,7 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) { while (pVgroup != NULL) {
mgmtUpdateVgroupState(pVgroup, LB_VGROUP_STATE_UPDATE, 0); mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
if (oldReplicaNum < pDb->cfg.replications) { if (oldReplicaNum < pDb->cfg.replications) {
if (!mgmtAddVnode(pVgroup, NULL, NULL)) { if (!mgmtAddVnode(pVgroup, NULL, NULL)) {
mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
......
...@@ -44,20 +44,21 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { ...@@ -44,20 +44,21 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
pDnode->openVnodes = 0; pDnode->openVnodes = 0;
#ifdef CLUSTER #ifdef CLUSTER
pDnode->status = TSDB_DNODE_STATUS_OFFLINE; pDnode->status = TSDB_DN_STATUS_OFFLINE;
#else #else
pDnode->status = TSDB_DNODE_STATUS_READY; pDnode->status = TSDB_DN_STATUS_READY;
#endif #endif
} }
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
int totalVnodes = 0; int totalVnodes = 0;
mTrace("dnode:%s, begin calc free vnodes", taosIpStr(pDnode->privateIp));
for (int i = 0; i < pDnode->numOfVnodes; ++i) { for (int i = 0; i < pDnode->numOfVnodes; ++i) {
SVnodeLoad *pVload = pDnode->vload + i; SVnodeLoad *pVload = pDnode->vload + i;
if (pVload->vgId != 0) { if (pVload->vgId != 0) {
mTrace("dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s", mTrace("%d-dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s",
taosIpStr(pDnode->privateIp), i, pVload->vgId, totalVnodes, taosIpStr(pDnode->privateIp), i, pVload->vgId,
pVload->status, taosGetDnodeStatusStr(pVload->status), pVload->status, taosGetDnodeStatusStr(pVload->status),
pVload->dropStatus, taosGetVnodeDropStatusStr(pVload->dropStatus), pVload->dropStatus, taosGetVnodeDropStatusStr(pVload->dropStatus),
pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus)); pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus));
...@@ -66,7 +67,7 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) { ...@@ -66,7 +67,7 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
} }
pDnode->numOfFreeVnodes = pDnode->numOfVnodes - totalVnodes; pDnode->numOfFreeVnodes = pDnode->numOfVnodes - totalVnodes;
mTrace("dnode:%s, calc free vnodes, numOfVnodes:%d, numOfFreeVnodes:%d, totalVnodes:%d", mTrace("dnode:%s, numOfVnodes:%d, numOfFreeVnodes:%d, totalVnodes:%d",
taosIpStr(pDnode->privateIp), pDnode->numOfVnodes, pDnode->numOfFreeVnodes, totalVnodes); taosIpStr(pDnode->privateIp), pDnode->numOfVnodes, pDnode->numOfFreeVnodes, totalVnodes);
} }
...@@ -201,7 +202,7 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { ...@@ -201,7 +202,7 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetDnodeBalanceStateStr(pDnode->lbState)); strcpy(pWrite, taosGetDnodeLbStatusStr(pDnode->lbStatus));
cols++; cols++;
tinet_ntoa(ipstr, pDnode->publicIp); tinet_ntoa(ipstr, pDnode->publicIp);
......
...@@ -43,6 +43,11 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -43,6 +43,11 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SMeterCfgMsg *pCfg = (SMeterCfgMsg *)cont; SMeterCfgMsg *pCfg = (SMeterCfgMsg *)cont;
SVgObj * pVgroup; SVgObj * pVgroup;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode); int vnode = htonl(pCfg->vnode);
int sid = htonl(pCfg->sid); int sid = htonl(pCfg->sid);
...@@ -51,6 +56,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -51,6 +56,7 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
pMsg = pStart; pMsg = pStart;
if (vnode < pObj->numOfVnodes) { if (vnode < pObj->numOfVnodes) {
...@@ -87,10 +93,18 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -87,10 +93,18 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *)cont; SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *)cont;
SVgObj * pVgroup = NULL; SVgObj * pVgroup = NULL;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT);
return 0;
}
int vnode = htonl(pCfg->vnode); int vnode = htonl(pCfg->vnode);
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP); pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP);
if (pStart == NULL) return 0; if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
}
pMsg = pStart; pMsg = pStart;
if (vnode < pObj->numOfVnodes) pVgroup = mgmtGetVgroup(pObj->vload[vnode].vgId); if (vnode < pObj->numOfVnodes) pVgroup = mgmtGetVgroup(pObj->vload[vnode].vgId);
...@@ -99,7 +113,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -99,7 +113,7 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
*pMsg = 0; *pMsg = 0;
pMsg++; pMsg++;
pMsg = mgmtBuildVpeersIe(pMsg, pVgroup, vnode); pMsg = mgmtBuildVpeersIe(pMsg, pVgroup, vnode);
mTrace("dnode:%s, vnode:%d, vgroup:%d, send create meter msg, code:%d", taosIpStr(pObj->privateIp), vnode, pVgroup->vgId, *pMsg); mTrace("dnode:%s, vnode:%d, vgroup:%d, send create vnode msg, code:%d", taosIpStr(pObj->privateIp), vnode, pVgroup->vgId, *pMsg);
} else { } else {
mTrace("dnode:%s, vnode:%d, no vgroup info, vgroup:%d", taosIpStr(pObj->privateIp), vnode, pObj->vload[vnode].vgId); mTrace("dnode:%s, vnode:%d, no vgroup info, vgroup:%d", taosIpStr(pObj->privateIp), vnode, pObj->vload[vnode].vgId);
*pMsg = TSDB_CODE_NOT_ACTIVE_VNODE; *pMsg = TSDB_CODE_NOT_ACTIVE_VNODE;
...@@ -121,6 +135,11 @@ int mgmtProcessFreeVnodeRsp(char *msg, int msgLen, SDnodeObj *pObj) { return 0; ...@@ -121,6 +135,11 @@ int mgmtProcessFreeVnodeRsp(char *msg, int msgLen, SDnodeObj *pObj) { return 0;
int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg; STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0;
}
SDbObj *pDb = mgmtGetDb(pRsp->more); SDbObj *pDb = mgmtGetDb(pRsp->more);
if (!pDb) { if (!pDb) {
mError("dnode:%s, db not find, code:%d", taosIpStr(pObj->privateIp), pRsp->code); mError("dnode:%s, db not find, code:%d", taosIpStr(pObj->privateIp), pRsp->code);
...@@ -128,7 +147,8 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { ...@@ -128,7 +147,8 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
} }
if (pDb->vgStatus != TSDB_VG_STATUS_IN_PROGRESS) { if (pDb->vgStatus != TSDB_VG_STATUS_IN_PROGRESS) {
mTrace("dnode:%s, db:%s vpeer rsp already disposed, vgroup status:%d code:%d", taosIpStr(pObj->privateIp), pRsp->more, pDb->vgStatus, pRsp->code); mTrace("dnode:%s, db:%s vpeer rsp already disposed, vgroup status:%s code:%d",
taosIpStr(pObj->privateIp), pRsp->more, taosGetVgroupStatusStr(pDb->vgStatus), pRsp->code);
return 0; return 0;
} }
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "ttime.h" #include "ttime.h"
#include "vnodeTagMgmt.h" #include "vnodeTagMgmt.h"
#include "tstatus.h"
extern int64_t sdbVersion; extern int64_t sdbVersion;
...@@ -617,7 +618,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -617,7 +618,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) {
mgmtDestroyMeter(pMeter); mgmtDestroyMeter(pMeter);
mTrace("table:%s, vgroup in creating progress", pCreate->meterId); //mTrace("table:%s, vgroup in creating progress", pCreate->meterId);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
...@@ -661,7 +662,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -661,7 +662,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
pMeter->uid = (((uint64_t)pMeter->gid.vgId) << 40) + ((((uint64_t)pMeter->gid.sid) & ((1ul << 24) - 1ul)) << 16) + pMeter->uid = (((uint64_t)pMeter->gid.vgId) << 40) + ((((uint64_t)pMeter->gid.sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t)sdbVersion & ((1ul << 16) - 1ul)); ((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%d db:%s", mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%ld db:%s",
pMeter->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pMeter->uid, pDb->name); pMeter->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pMeter->uid, pDb->name);
} else { } else {
pMeter->uid = (((uint64_t)pMeter->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul)); pMeter->uid = (((uint64_t)pMeter->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tstatus.h"
#pragma GCC diagnostic push #pragma GCC diagnostic push
...@@ -973,7 +974,10 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -973,7 +974,10 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
} }
} }
if (code != 0) { if (code == 1) {
//mTrace("table:%s, wait vgroup create finish", pCreate->meterId, code);
}
else if (code != 0) {
mError("table:%s, failed to create table, code:%d", pCreate->meterId, code); mError("table:%s, failed to create table, code:%d", pCreate->meterId, code);
} else { } else {
mTrace("table:%s, table is created by %s", pCreate->meterId, pConn->pUser->user); mTrace("table:%s, table is created by %s", pCreate->meterId, pConn->pUser->user);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "mgmt.h" #include "mgmt.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tlog.h" #include "tlog.h"
#include "tstatus.h"
void * vgSdb = NULL; void * vgSdb = NULL;
int tsVgUpdateSize; int tsVgUpdateSize;
...@@ -312,7 +313,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) ...@@ -312,7 +313,7 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pVgroup->lbState ? "updating" : "ready"); strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
cols++; cols++;
for (int i = 0; i < maxReplica; ++i) { for (int i = 0; i < maxReplica; ++i) {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeCache.h" #include "vnodeCache.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "tstatus.h"
void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery); void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery);
void vnodeProcessCommitTimer(void *param, void *tmrId); void vnodeProcessCommitTimer(void *param, void *tmrId);
...@@ -77,7 +78,7 @@ void *vnodeOpenCachePool(int vnode) { ...@@ -77,7 +78,7 @@ void *vnodeOpenCachePool(int vnode) {
} }
} }
dTrace("vid:%d, cache pool is allocated:0x%x", vnode, pCachePool); dPrint("vid:%d, cache pool is allocated:0x%x", vnode, pCachePool);
return pCachePool; return pCachePool;
...@@ -102,7 +103,7 @@ void vnodeCloseCachePool(int vnode) { ...@@ -102,7 +103,7 @@ void vnodeCloseCachePool(int vnode) {
taosTmrStopA(&pVnode->commitTimer); taosTmrStopA(&pVnode->commitTimer);
if (pVnode->commitInProcess) pthread_cancel(pVnode->commitThread); if (pVnode->commitInProcess) pthread_cancel(pVnode->commitThread);
dTrace("vid:%d, cache pool closed, count:%d", vnode, pCachePool->count); dPrint("vid:%d, cache pool closed, count:%d", vnode, pCachePool->count);
int maxAllocBlock = (1024 * 1024 * 1024) / pVnode->cfg.cacheBlockSize; int maxAllocBlock = (1024 * 1024 * 1024) / pVnode->cfg.cacheBlockSize;
while (blockId < pVnode->cfg.cacheNumOfBlocks.totalBlocks) { while (blockId < pVnode->cfg.cacheNumOfBlocks.totalBlocks) {
...@@ -297,7 +298,7 @@ pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode) { ...@@ -297,7 +298,7 @@ pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode) {
taosTmrStopA(&pVnode->commitTimer); taosTmrStopA(&pVnode->commitTimer);
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_UNSYNCED) { if (pVnode->vnodeStatus == TSDB_VN_STATUS_UNSYNCED) {
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer); taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
dTrace("vid:%d, it is in unsyc state, commit later", pVnode->vnode); dTrace("vid:%d, it is in unsyc state, commit later", pVnode->vnode);
return pVnode->commitThread; return pVnode->commitThread;
......
...@@ -235,7 +235,7 @@ int vnodeInitCommit(int vnode) { ...@@ -235,7 +235,7 @@ int vnodeInitCommit(int vnode) {
} }
pVnode->pWrite += size; pVnode->pWrite += size;
dTrace("vid:%d, commit log is initialized", vnode); dPrint("vid:%d, commit log is initialized", vnode);
return 0; return 0;
} }
......
...@@ -125,10 +125,8 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam ...@@ -125,10 +125,8 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam
if (symlink(dDataName, dataName) != 0) return -1; if (symlink(dDataName, dataName) != 0) return -1;
if (symlink(dLastName, lastName) != 0) return -1; if (symlink(dLastName, lastName) != 0) return -1;
dTrace( dPrint("vid:%d, fileId:%d, empty header file:%s dataFile:%s lastFile:%s on disk:%s is created ",
"vid:%d, fileId:%d, empty header file:%s dataFile:%s lastFile:%s on " vnode, fileId, headName, dataName, lastName, path);
"disk:%s is created ",
vnode, fileId, headName, dataName, lastName, path);
return 0; return 0;
} }
...@@ -309,7 +307,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { ...@@ -309,7 +307,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
vnodeRecoverFromPeer(pVnode, fileId); vnodeRecoverFromPeer(pVnode, fileId);
goto _error; goto _error;
} else { } else {
dTrace("vid:%d, data file:%s is opened to write", vnode, name); dPrint("vid:%d, data file:%s is opened to write", vnode, name);
} }
// open last file // open last file
...@@ -414,7 +412,7 @@ void vnodeRemoveFile(int vnode, int fileId) { ...@@ -414,7 +412,7 @@ void vnodeRemoveFile(int vnode, int fileId) {
remove(dDataName); remove(dDataName);
remove(dLastName); remove(dLastName);
dTrace("vid:%d fileId:%d on disk: %s is removed, numOfFiles:%d maxFiles:%d", vnode, fileId, path, dPrint("vid:%d fileId:%d on disk: %s is removed, numOfFiles:%d maxFiles:%d", vnode, fileId, path,
pVnode->numOfFiles, pVnode->maxFiles); pVnode->numOfFiles, pVnode->maxFiles);
} }
......
...@@ -112,7 +112,7 @@ FILE *vnodeOpenMeterObjFile(int vnode) { ...@@ -112,7 +112,7 @@ FILE *vnodeOpenMeterObjFile(int vnode) {
fp = fopen(fileName, "r+"); fp = fopen(fileName, "r+");
if (fp != NULL) { if (fp != NULL) {
if (vnodeCheckFileIntegrity(fp) < 0) { if (vnodeCheckFileIntegrity(fp) < 0) {
dError("file:%s is corrupted, need to restore it first", fileName); dError("file:%s is corrupted, need to restore it first, exit program", fileName);
fclose(fp); fclose(fp);
// todo: how to recover // todo: how to recover
...@@ -376,7 +376,11 @@ int vnodeOpenMetersVnode(int vnode) { ...@@ -376,7 +376,11 @@ int vnodeOpenMetersVnode(int vnode) {
fseek(fp, TSDB_FILE_HEADER_LEN * 2 / 4, SEEK_SET); fseek(fp, TSDB_FILE_HEADER_LEN * 2 / 4, SEEK_SET);
fread(&pVnode->cfg, sizeof(SVnodeCfg), 1, fp); fread(&pVnode->cfg, sizeof(SVnodeCfg), 1, fp);
if (vnodeIsValidVnodeCfg(&pVnode->cfg) == false) { if (vnodeIsValidVnodeCfg(&pVnode->cfg) == false) {
dError("vid:%d, maxSessions:%d cacheBlockSize:%d replications:%d daysPerFile:%d daysToKeep:%d invalid, clear it",
vnode, pVnode->cfg.maxSessions, pVnode->cfg.cacheBlockSize, pVnode->cfg.replications,
pVnode->cfg.daysPerFile, pVnode->cfg.daysToKeep);
pVnode->cfg.maxSessions = 0; // error in vnode file pVnode->cfg.maxSessions = 0; // error in vnode file
return 0; return 0;
} }
...@@ -484,7 +488,7 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec) { ...@@ -484,7 +488,7 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec) {
vnodeSaveMeterObjToFile(pNew); vnodeSaveMeterObjToFile(pNew);
// vnodeCreateMeterMgmt(pNew, pSec); // vnodeCreateMeterMgmt(pNew, pSec);
vnodeCreateStream(pNew); vnodeCreateStream(pNew);
dTrace("vid:%d sid:%d id:%s, meterObj is created, uid:%ld", pNew->vnode, pNew->sid, pNew->meterId, pNew->uid); dTrace("vid:%d, sid:%d id:%s, meterObj is created, uid:%ld", pNew->vnode, pNew->sid, pNew->meterId, pNew->uid);
} }
return code; return code;
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "vnodeStore.h" #include "vnodeStore.h"
#include "tstatus.h"
#pragma GCC diagnostic ignored "-Wint-conversion" #pragma GCC diagnostic ignored "-Wint-conversion"
extern int tsMaxQueues; extern int tsMaxQueues;
...@@ -90,7 +91,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -90,7 +91,7 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
// if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) { // if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) {
#ifdef CLUSTER #ifdef CLUSTER
if (vnodeList[vnode].vnodeStatus != TSDB_VNODE_STATUS_MASTER) { if (vnodeList[vnode].vnodeStatus != TSDB_VN_STATUS_MASTER) {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus); dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus);
} else { } else {
...@@ -177,7 +178,7 @@ int vnodeOpenShellVnode(int vnode) { ...@@ -177,7 +178,7 @@ int vnodeOpenShellVnode(int vnode) {
return -1; return -1;
} }
dTrace("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions); dPrint("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -59,7 +59,7 @@ static int vnodeInitStoreVnode(int vnode) { ...@@ -59,7 +59,7 @@ static int vnodeInitStoreVnode(int vnode) {
} }
pthread_mutex_init(&(pVnode->vmutex), NULL); pthread_mutex_init(&(pVnode->vmutex), NULL);
dTrace("vid:%d, storage initialized, version:%ld fileId:%d numOfFiles:%d", vnode, pVnode->version, pVnode->fileId, dPrint("vid:%d, storage initialized, version:%ld fileId:%d numOfFiles:%d", vnode, pVnode->version, pVnode->fileId,
pVnode->numOfFiles); pVnode->numOfFiles);
return 0; return 0;
...@@ -78,12 +78,12 @@ int vnodeOpenVnode(int vnode) { ...@@ -78,12 +78,12 @@ int vnodeOpenVnode(int vnode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!(pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE || pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING)) { if (!(pVnode->vnodeStatus == TSDB_VN_STATUS_OFFLINE || pVnode->vnodeStatus == TSDB_VN_STATUS_CREATING)) {
dError("vid:%d, status:%s, cannot enter open operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dError("vid:%d, status:%s, cannot enter open operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS; return TSDB_CODE_INVALID_VNODE_STATUS;
} }
dTrace("vid:%d, status:%s, start to open", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dPrint("vid:%d, status:%s, start to open", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pthread_mutex_lock(&dmutex); pthread_mutex_lock(&dmutex);
// not enough memory, abort // not enough memory, abort
...@@ -104,7 +104,7 @@ int vnodeOpenVnode(int vnode) { ...@@ -104,7 +104,7 @@ int vnodeOpenVnode(int vnode) {
vnodeOpenStreams(pVnode, NULL); vnodeOpenStreams(pVnode, NULL);
#endif #endif
dTrace("vid:%d, vnode is opened, openVnodes:%d, status:%s", vnode, tsOpenVnodes, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dPrint("vid:%d, vnode is opened, openVnodes:%d, status:%s", vnode, tsOpenVnodes, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -140,12 +140,12 @@ static int vnodeCloseVnode(int vnode) { ...@@ -140,12 +140,12 @@ static int vnodeCloseVnode(int vnode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) { if (pVnode->vnodeStatus == TSDB_VN_STATUS_DELETING) {
dTrace("vid:%d, status:%s, another thread performed delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dPrint("vid:%d, status:%s, another thread performed delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
dTrace("vid:%d, status:%s, enter close operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dPrint("vid:%d, status:%s, enter close operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pVnode->vnodeStatus = TSDB_VNODE_STATUS_CLOSING; pVnode->vnodeStatus = TSDB_VN_STATUS_CLOSING;
} }
// set the meter is dropped flag // set the meter is dropped flag
...@@ -154,8 +154,8 @@ static int vnodeCloseVnode(int vnode) { ...@@ -154,8 +154,8 @@ static int vnodeCloseVnode(int vnode) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
dTrace("vid:%d, status:%s, enter delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dPrint("vid:%d, status:%s, enter delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
pVnode->vnodeStatus = TSDB_VNODE_STATUS_DELETING; pVnode->vnodeStatus = TSDB_VN_STATUS_DELETING;
vnodeCloseStream(vnodeList + vnode); vnodeCloseStream(vnodeList + vnode);
vnodeCancelCommit(vnodeList + vnode); vnodeCancelCommit(vnodeList + vnode);
...@@ -178,12 +178,12 @@ static int vnodeCloseVnode(int vnode) { ...@@ -178,12 +178,12 @@ static int vnodeCloseVnode(int vnode) {
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) { int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
char fileName[128]; char fileName[128];
if (vnodeList[vnode].vnodeStatus != TSDB_VNODE_STATUS_OFFLINE) { if (vnodeList[vnode].vnodeStatus != TSDB_VN_STATUS_OFFLINE) {
dError("vid:%d, status:%s, cannot enter create operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); dError("vid:%d, status:%s, cannot enter create operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
return TSDB_CODE_INVALID_VNODE_STATUS; return TSDB_CODE_INVALID_VNODE_STATUS;
} }
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_CREATING; vnodeList[vnode].vnodeStatus = TSDB_VN_STATUS_CREATING;
sprintf(fileName, "%s/vnode%d", tsDirectory, vnode); sprintf(fileName, "%s/vnode%d", tsDirectory, vnode);
mkdir(fileName, 0755); mkdir(fileName, 0755);
...@@ -235,7 +235,7 @@ static void vnodeRemoveDataFiles(int vnode) { ...@@ -235,7 +235,7 @@ static void vnodeRemoveDataFiles(int vnode) {
if (tcode >= 0) { if (tcode >= 0) {
remove(dfilePath); remove(dfilePath);
dTrace("Data file %s is removed, link file %s", dfilePath, linkFile); dPrint("Data file %s is removed, link file %s", dfilePath, linkFile);
} }
} else { } else {
remove(de->d_name); remove(de->d_name);
...@@ -250,7 +250,7 @@ static void vnodeRemoveDataFiles(int vnode) { ...@@ -250,7 +250,7 @@ static void vnodeRemoveDataFiles(int vnode) {
sprintf(vnodeDir, "%s/vnode%d", tsDirectory, vnode); sprintf(vnodeDir, "%s/vnode%d", tsDirectory, vnode);
rmdir(vnodeDir); rmdir(vnodeDir);
dTrace("vid:%d, vnode is removed!", vnode); dPrint("vid:%d, vnode is removed!", vnode);
} }
int vnodeRemoveVnode(int vnode) { int vnodeRemoveVnode(int vnode) {
...@@ -258,9 +258,9 @@ int vnodeRemoveVnode(int vnode) { ...@@ -258,9 +258,9 @@ int vnodeRemoveVnode(int vnode) {
if (vnodeList[vnode].cfg.maxSessions > 0) { if (vnodeList[vnode].cfg.maxSessions > 0) {
SVnodeObj* pVnode = &vnodeList[vnode]; SVnodeObj* pVnode = &vnodeList[vnode];
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING if (pVnode->vnodeStatus == TSDB_VN_STATUS_CREATING
|| pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE || pVnode->vnodeStatus == TSDB_VN_STATUS_OFFLINE
|| pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) { || pVnode->vnodeStatus == TSDB_VN_STATUS_DELETING) {
dError("vid:%d, status:%s, cannot enter close/delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus)); dError("vid:%d, status:%s, cannot enter close/delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} else { } else {
...@@ -273,7 +273,7 @@ int vnodeRemoveVnode(int vnode) { ...@@ -273,7 +273,7 @@ int vnodeRemoveVnode(int vnode) {
} }
} else { } else {
dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions); dPrint("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions);
vnodeList[vnode].cfg.maxSessions = 0; //reset value vnodeList[vnode].cfg.maxSessions = 0; //reset value
vnodeCalcOpenVnodes(); vnodeCalcOpenVnodes();
} }
...@@ -325,7 +325,7 @@ void vnodeCleanUpOneVnode(int vnode) { ...@@ -325,7 +325,7 @@ void vnodeCleanUpOneVnode(int vnode) {
again = 1; again = 1;
if (vnodeList[vnode].pCachePool) { if (vnodeList[vnode].pCachePool) {
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE; vnodeList[vnode].vnodeStatus = TSDB_VN_STATUS_OFFLINE;
vnodeClosePeerVnode(vnode); vnodeClosePeerVnode(vnode);
} }
...@@ -354,7 +354,7 @@ void vnodeCleanUpVnodes() { ...@@ -354,7 +354,7 @@ void vnodeCleanUpVnodes() {
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) { for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) {
if (vnodeList[vnode].pCachePool) { if (vnodeList[vnode].pCachePool) {
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE; vnodeList[vnode].vnodeStatus = TSDB_VN_STATUS_OFFLINE;
vnodeClosePeerVnode(vnode); vnodeClosePeerVnode(vnode);
} }
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#include "tstatus.h"
/* static TAOS *dbConn = NULL; */ /* static TAOS *dbConn = NULL; */
void vnodeCloseStreamCallback(void *param); void vnodeCloseStreamCallback(void *param);
...@@ -77,7 +78,7 @@ void vnodeOpenStreams(void *param, void *tmrId) { ...@@ -77,7 +78,7 @@ void vnodeOpenStreams(void *param, void *tmrId) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
SMeterObj *pObj; SMeterObj *pObj;
if (pVnode->streamRole == 0) return; if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
if (pVnode->meterList == NULL) return; if (pVnode->meterList == NULL) return;
taosTmrStopA(&pVnode->streamTimer); taosTmrStopA(&pVnode->streamTimer);
...@@ -116,7 +117,7 @@ void vnodeCreateStream(SMeterObj *pObj) { ...@@ -116,7 +117,7 @@ void vnodeCreateStream(SMeterObj *pObj) {
SVnodeObj *pVnode = vnodeList + pObj->vnode; SVnodeObj *pVnode = vnodeList + pObj->vnode;
if (pVnode->streamRole == 0) return; if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
if (pObj->pStream) return; if (pObj->pStream) return;
dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql); dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql);
...@@ -151,7 +152,7 @@ void vnodeRemoveStream(SMeterObj *pObj) { ...@@ -151,7 +152,7 @@ void vnodeRemoveStream(SMeterObj *pObj) {
// Close all streams in a vnode // Close all streams in a vnode
void vnodeCloseStream(SVnodeObj *pVnode) { void vnodeCloseStream(SVnodeObj *pVnode) {
SMeterObj *pObj; SMeterObj *pObj;
dTrace("vid:%d, stream is closed, old role:%d", pVnode->vnode, pVnode->streamRole); dPrint("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
// stop stream computing // stop stream computing
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
...@@ -168,17 +169,18 @@ void vnodeCloseStream(SVnodeObj *pVnode) { ...@@ -168,17 +169,18 @@ void vnodeCloseStream(SVnodeObj *pVnode) {
void vnodeUpdateStreamRole(SVnodeObj *pVnode) { void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
/* SMeterObj *pObj; */ /* SMeterObj *pObj; */
int newRole = (pVnode->vnodeStatus == TSDB_VNODE_STATUS_MASTER) ? 1 : 0; int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP;
if (newRole != pVnode->streamRole) { if (newRole != pVnode->streamRole) {
dTrace("vid:%d, stream role is changed to:%d", pVnode->vnode, newRole); dPrint("vid:%d, stream role is changed from %s to %s",
pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole));
pVnode->streamRole = newRole; pVnode->streamRole = newRole;
if (newRole) { if (newRole == TSDB_VN_STREAM_STATUS_START) {
vnodeOpenStreams(pVnode, NULL); vnodeOpenStreams(pVnode, NULL);
} else { } else {
vnodeCloseStream(pVnode); vnodeCloseStream(pVnode);
} }
} else { } else {
dTrace("vid:%d, stream role is keep to:%d", pVnode->vnode, newRole); dPrint("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
} }
} }
......
...@@ -90,4 +90,6 @@ void vnodeProcessMsgFromMgmtSpec(SSchedMsg *sched) { ...@@ -90,4 +90,6 @@ void vnodeProcessMsgFromMgmtSpec(SSchedMsg *sched) {
int vnodeInitMgmt() { return 0; } int vnodeInitMgmt() { return 0; }
void vnodeInitMgmtIp() {}
int vnodeSaveCreateMsgIntoQueue(SVnodeObj *pVnode, char *pMsg, int msgLen) { return 0; } int vnodeSaveCreateMsgIntoQueue(SVnodeObj *pVnode, char *pMsg, int msgLen) { return 0; }
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "tstatus.h"
void mgmtStartBalanceTimer(int mseconds) {} void mgmtStartBalanceTimer(int mseconds) {}
...@@ -25,10 +26,11 @@ void mgmtCleanupBalance() {} ...@@ -25,10 +26,11 @@ void mgmtCleanupBalance() {}
int mgmtAllocVnodes(SVgObj *pVgroup) { int mgmtAllocVnodes(SVgObj *pVgroup) {
int selectedVnode = -1; int selectedVnode = -1;
SDnodeObj *pDnode = &dnodeObj; SDnodeObj *pDnode = &dnodeObj;
int lastAllocVode = pDnode->lastAllocVnode;
for (int i = 0; i < pDnode->numOfVnodes; i++) { for (int i = 0; i < pDnode->numOfVnodes; i++) {
int vnode = (i + pDnode->lastAllocVnode) % pDnode->numOfVnodes; int vnode = (i + lastAllocVode) % pDnode->numOfVnodes;
if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_READY) { if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) {
selectedVnode = vnode; selectedVnode = vnode;
break; break;
} }
...@@ -38,8 +40,7 @@ int mgmtAllocVnodes(SVgObj *pVgroup) { ...@@ -38,8 +40,7 @@ int mgmtAllocVnodes(SVgObj *pVgroup) {
mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes);
return -1; return -1;
} else { } else {
mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode);
pDnode->lastAllocVnode);
pVgroup->vnodeGid[0].vnode = selectedVnode; pVgroup->vnodeGid[0].vnode = selectedVnode;
pDnode->lastAllocVnode = selectedVnode + 1; pDnode->lastAllocVnode = selectedVnode + 1;
if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0;
...@@ -53,8 +54,8 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) { ...@@ -53,8 +54,8 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) {
bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; } bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; }
void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbState) {} void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbStatus) {}
void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbState, int srcIp) {} void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbStatus, int srcIp) {}
bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; } bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; }
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mgmt.h" #include "mgmt.h"
#include "tstatus.h"
SDnodeObj dnodeObj; SDnodeObj dnodeObj;
extern uint32_t tsRebootTime; extern uint32_t tsRebootTime;
...@@ -30,13 +31,13 @@ int mgmtInitDnodes() { ...@@ -30,13 +31,13 @@ int mgmtInitDnodes() {
dnodeObj.createdTime = (int64_t)tsRebootTime * 1000; dnodeObj.createdTime = (int64_t)tsRebootTime * 1000;
dnodeObj.lastReboot = tsRebootTime; dnodeObj.lastReboot = tsRebootTime;
dnodeObj.numOfCores = (uint16_t)tsNumOfCores; dnodeObj.numOfCores = (uint16_t)tsNumOfCores;
dnodeObj.status = TSDB_DNODE_STATUS_READY; dnodeObj.status = TSDB_DN_STATUS_READY;
dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
dnodeObj.thandle = (void*)(1); //hack way dnodeObj.thandle = (void*)(1); //hack way
if (dnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) { if (dnodeObj.numOfVnodes == TSDB_INVALID_VNODE_NUM) {
mgmtSetDnodeMaxVnodes(&dnodeObj); mgmtSetDnodeMaxVnodes(&dnodeObj);
mPrint("first access, set total vnodes:%d", dnodeObj.numOfVnodes); mPrint("dnode first access, set total vnodes:%d", dnodeObj.numOfVnodes);
} }
return 0; return 0;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tutil.h" #include "tutil.h"
#include "vnode.h" #include "vnode.h"
#include "tsystem.h" #include "tsystem.h"
#include "tstatus.h"
extern void *dmQhandle; extern void *dmQhandle;
void * mgmtStatusTimer = NULL; void * mgmtStatusTimer = NULL;
...@@ -82,7 +83,7 @@ void mgmtCleanUpDnodeInt() {} ...@@ -82,7 +83,7 @@ void mgmtCleanUpDnodeInt() {}
void mgmtProcessDnodeStatus(void *handle, void *tmrId) { void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
SDnodeObj *pObj = &dnodeObj; SDnodeObj *pObj = &dnodeObj;
pObj->openVnodes = tsOpenVnodes; pObj->openVnodes = tsOpenVnodes;
pObj->status = TSDB_DNODE_STATUS_READY; pObj->status = TSDB_DN_STATUS_READY;
float memoryUsedMB = 0; float memoryUsedMB = 0;
taosGetSysMemory(&memoryUsedMB); taosGetSysMemory(&memoryUsedMB);
...@@ -93,10 +94,10 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) { ...@@ -93,10 +94,10 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
SVnodeObj * pVnode = vnodeList + vnode; SVnodeObj * pVnode = vnodeList + vnode;
// wait vnode dropped // wait vnode dropped
if (pVload->dropStatus == TSDB_VN_STATUS_DROPPING) { if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
if (vnodeList[vnode].cfg.maxSessions <= 0) { if (vnodeList[vnode].cfg.maxSessions <= 0) {
pVload->dropStatus = TSDB_VN_STATUS_READY; pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
pVload->status = TSDB_VN_STATUS_READY; pVload->status = TSDB_VN_STATUS_OFFLINE;
mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode); mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr); taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr);
} }
...@@ -107,7 +108,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) { ...@@ -107,7 +108,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
} }
pVload->vnode = vnode; pVload->vnode = vnode;
pVload->status = TSDB_VN_STATUS_READY; pVload->status = TSDB_VN_STATUS_MASTER;
pVload->totalStorage = pVnode->vnodeStatistic.totalStorage; pVload->totalStorage = pVnode->vnodeStatistic.totalStorage;
pVload->compStorage = pVnode->vnodeStatistic.compStorage; pVload->compStorage = pVnode->vnodeStatistic.compStorage;
pVload->pointsWritten = pVnode->vnodeStatistic.pointsWritten; pVload->pointsWritten = pVnode->vnodeStatistic.pointsWritten;
...@@ -116,7 +117,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) { ...@@ -116,7 +117,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
SVgObj *pVgroup = mgmtGetVgroup(vgId); SVgObj *pVgroup = mgmtGetVgroup(vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("vgroup:%d is not there, but associated with vnode %d", vgId, vnode); mError("vgroup:%d is not there, but associated with vnode %d", vgId, vnode);
pVload->dropStatus = TSDB_VN_STATUS_DROPPING; pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
continue; continue;
} }
...@@ -126,9 +127,9 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) { ...@@ -126,9 +127,9 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
continue; continue;
} }
if (pVload->vgId == 0 || pVload->dropStatus == TSDB_VN_STATUS_DROPPING) { if (pVload->vgId == 0 || pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
mError("vid:%d, mgmt not exist, drop it", vnode); mError("vid:%d, mgmt not exist, drop it", vnode);
pVload->dropStatus = TSDB_VN_STATUS_DROPPING; pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING;
} }
} }
......
...@@ -97,7 +97,7 @@ int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) { ...@@ -97,7 +97,7 @@ int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) {
} }
_correct_exit: _correct_exit:
dTrace("vid: %d new header file %s is correct", pVnode->vnode, pVnode->nfn); dPrint("vid: %d new header file %s is correct", pVnode->vnode, pVnode->nfn);
tfree(pBlocks); tfree(pBlocks);
tfree(pHeader); tfree(pHeader);
return 0; return 0;
......
...@@ -31,8 +31,8 @@ void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode) {} ...@@ -31,8 +31,8 @@ void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode) {}
int vnodeOpenPeerVnode(int vnode) { int vnodeOpenPeerVnode(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode; SVnodeObj *pVnode = vnodeList + vnode;
pVnode->vnodeStatus = (pVnode->cfg.replications > 1) ? TSDB_VNODE_STATUS_UNSYNCED : TSDB_VNODE_STATUS_MASTER; pVnode->vnodeStatus = (pVnode->cfg.replications > 1) ? TSDB_VN_STATUS_UNSYNCED : TSDB_VN_STATUS_MASTER;
dTrace("vid:%d, status:%s numOfPeers:%d", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus), pVnode->cfg.replications - 1); dPrint("vid:%d, status:%s numOfPeers:%d", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus), pVnode->cfg.replications - 1);
vnodeUpdateStreamRole(pVnode); vnodeUpdateStreamRole(pVnode);
return 0; return 0;
} }
......
...@@ -15,52 +15,89 @@ ...@@ -15,52 +15,89 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tsdb.h" #include "tsdb.h"
#include "tstatus.h"
const char* taosGetVgroupStatusStr(int vgroupStatus) {
switch (vgroupStatus) {
case TSDB_VG_STATUS_READY: return "ready";
case TSDB_VG_STATUS_IN_PROGRESS: return "inprogress";
case TSDB_VG_STATUS_COMMITLOG_INIT_FAILED: return "commitlog_init_failed";
case TSDB_VG_STATUS_INIT_FAILED: return "init_failed";
case TSDB_VG_STATUS_FULL: return "full";
default: return "undefined";
}
}
const char* taosGetDbStatusStr(int dbStatus) {
switch (dbStatus) {
case TSDB_DB_STATUS_READY: return "ready";
case TSDB_DB_STATUS_DROPPING: return "dropping";
case TSDB_DB_STATUS_DROP_FROM_SDB: return "drop_from_sdb";
default: return "undefined";
}
}
const char* taosGetVnodeStatusStr(int vnodeStatus) { const char* taosGetVnodeStatusStr(int vnodeStatus) {
switch (vnodeStatus) { switch (vnodeStatus) {
case TSDB_VNODE_STATUS_OFFLINE:return "offline"; case TSDB_VN_STATUS_OFFLINE: return "offline";
case TSDB_VNODE_STATUS_CREATING: return "creating"; case TSDB_VN_STATUS_CREATING: return "creating";
case TSDB_VNODE_STATUS_UNSYNCED: return "unsynced"; case TSDB_VN_STATUS_UNSYNCED: return "unsynced";
case TSDB_VNODE_STATUS_SLAVE: return "slave"; case TSDB_VN_STATUS_SLAVE: return "slave";
case TSDB_VNODE_STATUS_MASTER: return "master"; case TSDB_VN_STATUS_MASTER: return "master";
case TSDB_VNODE_STATUS_CLOSING: return "closing"; case TSDB_VN_STATUS_CLOSING: return "closing";
case TSDB_VNODE_STATUS_DELETING: return "deleting"; case TSDB_VN_STATUS_DELETING: return "deleting";
default: return "undefined"; default: return "undefined";
}
}
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) {
switch (vnodeSyncStatus) {
case TSDB_VN_SYNC_STATUS_INIT: return "init";
case TSDB_VN_SYNC_STATUS_SYNCING: return "syncing";
case TSDB_VN_SYNC_STATUS_SYNC_CACHE: return "sync_cache";
case TSDB_VN_SYNC_STATUS_SYNC_FILE: return "sync_file";
default: return "undefined";
}
}
const char* taosGetVnodeDropStatusStr(int dropping) {
switch (dropping) {
case TSDB_VN_DROP_STATUS_READY: return "ready";
case TSDB_VN_DROP_STATUS_DROPPING: return "dropping";
default: return "undefined";
} }
} }
const char* taosGetDnodeStatusStr(int dnodeStatus) { const char* taosGetDnodeStatusStr(int dnodeStatus) {
switch (dnodeStatus) { switch (dnodeStatus) {
case TSDB_DNODE_STATUS_OFFLINE: return "offline"; case TSDB_DN_STATUS_OFFLINE: return "offline";
case TSDB_DNODE_STATUS_READY: return "ready"; case TSDB_DN_STATUS_READY: return "ready";
default: return "undefined"; default: return "undefined";
} }
} }
const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus) { const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus) {
switch (dnodeBalanceStatus) { switch (dnodeBalanceStatus) {
case LB_DNODE_STATE_BALANCED: return "balanced"; case TSDB_DN_LB_STATUS_BALANCED: return "balanced";
case LB_DNODE_STATE_BALANCING: return "balancing"; case TSDB_DN_LB_STATUS_BALANCING: return "balancing";
case LB_DNODE_STATE_OFFLINE_REMOVING: return "offline removing"; case TSDB_DN_LB_STATUS_OFFLINE_REMOVING: return "offline removing";
case LB_DNODE_STATE_SHELL_REMOVING: return "removing"; case TSDB_DN_LB_STATE_SHELL_REMOVING: return "removing";
default: return "undefined"; default: return "undefined";
} }
} }
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) { const char* taosGetVgroupLbStatusStr(int vglbStatus) {
switch (vnodeSyncStatus) { switch (vglbStatus) {
case STDB_SSTATUS_INIT: return "init"; case TSDB_VG_LB_STATUS_READY: return "ready";
case TSDB_SSTATUS_SYNCING: return "syncing"; case TSDB_VG_LB_STATUS_UPDATE: return "updating";
case TSDB_SSTATUS_SYNC_CACHE: return "sync_cache"; default: return "undefined";
case TSDB_SSTATUS_SYNC_FILE: return "sync_file";
default: return "undefined";
} }
} }
const char* taosGetVnodeDropStatusStr(int dropping) { const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus) {
switch (dropping) { switch (vnodeStreamStatus) {
case 0: return "ready"; case TSDB_VN_STREAM_STATUS_START: return "start";
case 1: return "dropping"; case TSDB_VN_STREAM_STATUS_STOP: return "stop";
default: return "undefined"; default: return "undefined";
} }
} }
\ No newline at end of file
...@@ -490,7 +490,7 @@ char *taosIpStr(uint32_t ipInt) { ...@@ -490,7 +490,7 @@ char *taosIpStr(uint32_t ipInt) {
static int ipStrIndex = 0; static int ipStrIndex = 0;
char *ipStr = ipStrArray[(ipStrIndex++) % 3]; char *ipStr = ipStrArray[(ipStrIndex++) % 3];
sprintf(ipStr, "0x%x:%d.%d.%d.%d", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, ipInt >> 24); sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
return ipStr; return ipStr;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册