提交 5c339fde 编写于 作者: S songzhao

new pika

上级 21e67a00
...@@ -32,6 +32,5 @@ log/ ...@@ -32,6 +32,5 @@ log/
# third party # third party
third/*
gdb.txt gdb.txt
tags tags
[submodule "third/leveldb"]
path = third/leveldb
url = https://github.com/google/leveldb.git
[submodule "third/glog"]
path = third/glog
url = https://github.com/google/glog.git
...@@ -2,26 +2,24 @@ CXX = g++ ...@@ -2,26 +2,24 @@ CXX = g++
CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ -fPIC -Wno-unused-function CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ -fPIC -Wno-unused-function
OBJECT = pika OBJECT = pika
SRC_DIR = ./src SRC_DIR = ./src
THIRD_PATH = ./third/ THIRD_PATH = ./third
OUTPUT = ./output OUTPUT = ./output
INCLUDE_PATH = -I./include/ \ INCLUDE_PATH = -I./include/ \
-I./src/ \ -I./src/ \
-I$(THIRD_PATH)/glog-0.3.3/src/ \ -I$(THIRD_PATH)/glog/src/ \
-I$(THIRD_PATH)/leveldb/include/ -I$(THIRD_PATH)/leveldb/include/
LIB_PATH = -L./ \ LIB_PATH = -L./ \
-L$(THIRD_PATH)/glog-0.3.3/ \
-L/usr/local/lib/ \
-L$(THIRD_PATH)/leveldb/ -L$(THIRD_PATH)/leveldb/
LIBS = -lpthread \ LIBS = -lpthread \
-lprotobuf \ -lglog \
-lleveldb -lleveldb
DYNAMIC_LIBS = -lglog GLOG = /usr/local/lib/libglog.a
.PHONY: all clean .PHONY: all clean
...@@ -45,8 +43,12 @@ all: $(OBJECT) ...@@ -45,8 +43,12 @@ all: $(OBJECT)
@echo "Success, go, go, go..." @echo "Success, go, go, go..."
$(OBJECT): $(OBJS) $(OBJECT): $(GLOG) $(OBJS)
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) -Wl,-Bdynamic $(LIBS) $(DYNAMIC_LIBS) make -C $(THIRD_PATH)/leveldb/
$(CXX) $(CXXFLAGS) -o $@ $(OBJS) $(INCLUDE_PATH) $(LIB_PATH) -Wl,-Bdynamic $(LIBS)
$(GLOG):
cd $(THIRD_PATH)/glog; ./configure; make; echo '*' > $(CURDIR)/third/glog/.gitignore; sudo make install;
$(OBJS): %.o : %.cc $(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH) $(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)
......
...@@ -3,8 +3,7 @@ Pika ...@@ -3,8 +3,7 @@ Pika
Pika is a simple nosql database. Pika is a simple nosql database.
Pika used leveldb as storage engine. Pika used leveldb as storage engine.
The protocol used to contact with Pika is defined in google's proto buffer, The The protocol used to contact with Pika is redis protocol
proto buffer file is bada_sdk.proto
### Thread Model ### Thread Model
Pika used multi threads model. The main thread accept the request and then send Pika used multi threads model. The main thread accept the request and then send
......
message SdkInvalidOperation {
required int32 opcode = 1;
required int32 what = 2;
required bytes why = 3;
}
message SdkSet {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
required bytes value = 4;
optional int32 writesrc = 5 [default = 0];
}
message SdkSetRet {
required int32 opcode = 1;
required bool status = 2;
optional string master = 3;
}
message SdkCas {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
required bytes value = 4;
required int32 version = 5;
optional int32 writesrc = 6 [default = 0];
}
message SdkCasRet {
required int32 opcode = 1;
required bool status = 2;
optional string master = 3;
}
message SdkGet {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
}
message SdkGetRet {
required int32 opcode = 1;
required bytes value = 2;
optional string master = 3;
}
message SdkGetV {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
}
message SdkGetVRet {
required int32 opcode = 1;
required bytes value = 2;
required int32 version = 3;
optional string master = 4;
}
message SdkDelete {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
optional int32 writesrc = 4 [default = 0];
}
message SdkDeleteRet {
required int32 opcode = 1;
required bool status = 2;
optional string master = 3;
}
message SdkPing {
required int32 opcode = 1;
}
message SdkPingRet {
required int32 opcode = 1;
required bool status = 2;
}
message SdkGetIfAll {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
required bool Strict = 4;
}
message SdkGetIfAllRet {
required int32 opcode = 1;
required bytes value = 2;
required int32 version = 3;
optional string master = 4;
}
message SdkMGet {
required int32 opcode = 1;
required bytes table = 2;
repeated bytes key = 3;
}
message SdkMGetRet {
required int32 opcode = 1;
message KeyValue {
required bytes key = 1;
required bytes value = 2;
required int32 status = 3;
}
repeated KeyValue rets = 2;
optional string master = 3;
}
message GetMetadata4SDK2 {
required int32 cmd = 1;
required string table = 2;
}
message Rint32 {
required int32 id = 1;
}
message GetMetadata4SDK3 {
required int32 cmd = 1;
message NPs {
required string host = 1;
required int32 port = 2;
repeated Rint32 partitions = 3;
}
repeated NPs nps = 2;
required int32 pcnt = 3;
}
message GetPrimary4SDK2 {
required int32 cmd = 1;
required string table = 2;
required int32 partition = 3;
}
message GetPrimary4SDK3 {
required int32 cmd = 1;
required string host = 2;
required int32 port = 3;
}
message SdkHGet {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
required bytes key = 4;
}
message SdkHGetRet {
required int32 opcode = 1;
required bytes value = 2;
}
message SdkHPut {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
required bytes key = 4;
required bytes value = 5;
optional int32 writesrc = 6 [default = 0];
}
message SdkHPutRet {
required int32 opcode = 1;
required bool status = 2;
}
message SdkHDelete {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
required bytes key = 4;
optional int32 writesrc = 5 [default = 0];
}
message SdkHDeleteRet {
required int32 opcode = 1;
required bool status = 2;
}
message SdkHGetAllKeysByName {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
}
message SdkHGetAllKeysByNameRet {
required int32 opcode = 1;
repeated bytes keys = 2;
}
message SdkHGetAllKVsByName {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
}
message SdkHGetAllKVsByNameRet {
required int32 opcode = 1;
message HKeyValue {
required bytes key = 1;
required bytes value = 2;
}
repeated HKeyValue rets = 2;
}
message SdkHMput {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
message HKeyValue {
required bytes key = 1;
required bytes value = 2;
}
repeated HKeyValue kvs = 4;
optional int32 writesrc = 5 [default = 0];
}
message SdkHMputRet {
required int32 opcode = 1;
message HKeyRetval {
required bytes key = 1;
required int32 retval = 2;
}
repeated HKeyRetval krvs = 2;
}
message SdkHMget {
required int32 opcode = 1;
required bytes table = 2;
required bytes hname = 3;
repeated bytes keys = 4;
}
message SdkHMgetRet {
required int32 opcode = 1;
message HKeyValue {
required bytes key = 1;
required bytes value = 2;
required int32 retval = 3;
}
repeated HKeyValue kvs = 2;
}
message SdkSetWithTTL {
required int32 opcode = 1;
required bytes table = 2;
required bytes key = 3;
required bytes value = 4;
required int32 ttl = 5;
optional int32 writesrc = 6 [default = 0];
}
message SdkSetWithTTLRet {
required int32 opcode = 1;
required bool status = 2;
optional string master = 3;
}
message SdkLSize{
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
}
message SdkLSizeRet {
required int32 opcode = 1;
required uint64 size_r = 2;
}
message SdkLGet {
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
required int32 frontorback = 4;
}
message SdkLGetRet {
required int32 opcode = 1;
required bytes value = 2;
}
message SdkLPut {
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
required bytes value = 4;
required int32 frontorback = 5;
optional int32 writesrc = 6 [default = 0];
}
message SdkLPutRet {
required int32 opcode = 1;
required bool status = 2;
}
message SdkLPop{
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
required int32 frontorback = 4;
optional int32 writesrc = 5 [default = 0];
}
message SdkLPopRet {
required int32 opcode = 1;
required bytes value = 2;
}
message SdkLIndex {
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
required uint64 index = 4;
required int32 frontorback = 5;
}
message SdkLIndexRet {
required int32 opcode = 1;
required bytes value = 2;
}
message SdkLRange {
required int32 opcode = 1;
required bytes table = 2;
required bytes name = 3;
required uint64 from = 4;
required uint64 to = 5;
required int32 frontorback = 6;
}
message SdkLRangeRet {
required int32 opcode = 1;
repeated bytes value = 2;
}
/*
* The proto used in communicated in nodes
*/
message HbSend {
required int32 opcode = 1;
required bytes host = 2;
required int32 port = 3;
}
message HbSendRet {
required int32 opcode = 1;
required bool status = 2;
}
# Pika port # Bada port
port : 9221 port : 9221
# Thread Number # Thread Number
thread_num : 2 thread_num : 2
...@@ -6,12 +6,3 @@ thread_num : 2 ...@@ -6,12 +6,3 @@ thread_num : 2
log_path : ./log/ log_path : ./log/
# Pika glog level # Pika glog level
log_level : 1 log_level : 1
# Pika heartbeat port
hb_port : 9222
# The seed node
seed :
# The seed port
seed_port :
# Leveldb Data path
data_path : /tmp/testdb/
# Pika port
port : 9231
# Thread Number
thread_num : 2
# Pika log path
log_path : ./log/
# Pika glog level
log_level : 1
# Pika heartbeat port
hb_port : 9232
# The seed node
seed : 127.0.0.1
# The seed port
seed_port : 9222
# Leveldb Data path
data_path : /tmp/testdb1/
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include "stdlib.h" #include "stdlib.h"
#include "stdio.h" #include "stdio.h"
#include "xdebug.h" #include "xdebug.h"
#include "status.h"
class BaseConf class BaseConf
......
...@@ -32,13 +32,13 @@ ...@@ -32,13 +32,13 @@
/* #undef HAVE_LIBUNWIND_H */ /* #undef HAVE_LIBUNWIND_H */
/* define if you have google gflags library */ /* define if you have google gflags library */
/* #undef HAVE_LIB_GFLAGS */ #define HAVE_LIB_GFLAGS 1
/* define if you have google gmock library */ /* define if you have google gmock library */
/* #undef HAVE_LIB_GMOCK */ /* #undef HAVE_LIB_GMOCK */
/* define if you have google gtest library */ /* define if you have google gtest library */
/* #undef HAVE_LIB_GTEST */ #define HAVE_LIB_GTEST 1
/* define if you have libunwind */ /* define if you have libunwind */
/* #undef HAVE_LIB_UNWIND */ /* #undef HAVE_LIB_UNWIND */
...@@ -137,6 +137,9 @@ ...@@ -137,6 +137,9 @@
/* Define to the one symbol short name of this package. */ /* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME "glog" #define PACKAGE_TARNAME "glog"
/* Define to the home page for this package. */
#define PACKAGE_URL ""
/* Define to the version of this package. */ /* Define to the version of this package. */
#define PACKAGE_VERSION "0.3.3" #define PACKAGE_VERSION "0.3.3"
...@@ -157,7 +160,7 @@ ...@@ -157,7 +160,7 @@
#define STL_NAMESPACE std #define STL_NAMESPACE std
/* location of source code */ /* location of source code */
#define TEST_SRC_DIR "." #define TEST_SRC_DIR "./third/glog"
/* Version number of package */ /* Version number of package */
#define VERSION "0.3.3" #define VERSION "0.3.3"
......
/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _REDIS_FMACRO_H
#define _REDIS_FMACRO_H
#define _BSD_SOURCE
#if defined(__linux__)
#define _GNU_SOURCE_REDIS
#define _DEFAULT_SOURCE
#endif
#if defined(_AIX)
#define _ALL_SOURCE
#endif
#if defined(__linux__) || defined(__OpenBSD__)
#define _XOPEN_SOURCE 700
/*
* On NetBSD, _XOPEN_SOURCE undefines _NETBSD_SOURCE and
* thus hides inet_aton etc.
*/
#elif !defined(__NetBSD__)
#define _XOPEN_SOURCE
#endif
#if defined(__sun)
#define _POSIX_C_SOURCE 199506L
#endif
#define _LARGEFILE_SOURCE
#define _FILE_OFFSET_BITS 64
#endif
#ifndef __PIKA_HB_CONTEXT_H__
#define __PIKA_HB_CONTEXT_H_
#include "pika_define.h"
#include "fcntl.h"
#include "pika_define.h"
#include "status.h"
#include "csapp.h"
#include <errno.h>
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdint.h>
class HbContext {
public:
HbContext();
~HbContext();
char* GetContextBuffer(int *buf_len);
Status BuildObuf(int32_t opcode, const int packet_len);
Status HbBufferWrite();
Status HbBufferRead();
void set_fd(const int fd) { fd_ = fd; }
void set_flags(const int flags) { flags_ = flags; }
Status SetBlockType(BlockType);
Status SetTcpNoDelay();
int fd() { return fd_; }
int flags() { return flags_; }
const char *rbuf() { return rbuf_; }
int32_t rbuf_len() { return rbuf_len_; }
int32_t r_opcode() { return r_opcode_; }
// redisReader *reader; /* Protocol reader */
private:
int fd_;
int flags_;
Status HbBufferReadHeader(rio_t *rio);
Status HbBufferReadCode(rio_t *rio);
Status HbBufferReadPacket(rio_t *rio);
char *obuf_; /* Write buffer */
int32_t obuf_len_;
int header_len_;
char *rbuf_;
int32_t rbuf_len_;
int32_t r_opcode_;
HbContext(const HbContext&);
void operator = (const HbContext&);
};
#endif
#ifndef __PIKA_H__
#define __PIKA_H__
#include <stdio.h>
#include <pthread.h>
#include <vector>
class PikaMeta;
class PikaWorker;
class PikaHb;
class PikaHbMonitor;
class PikaNode;
class Pika
{
public:
Pika();
~Pika();
/*
* Start the heartbeat thread
* Start heartbeat will listen to the hb_port and start receive
* connection
*/
int RunHb();
int RunHbMonitor();
/*
* Start the main worker thread
* The worker must be the last thread to start
*/
int RunWorker();
static void* StartWorker(void* args);
/*
* Start the pulse thread, pulse thread will connect all the node that
* has connected.
*/
int RunPulse();
static void* StartPulse(void* args);
PikaWorker* pikaWorker() { return pikaWorker_; }
PikaHb* pikaHb() { return pikaHb_; }
private:
std::vector<PikaNode> preNode_;
/*
* The thread contain the meta infomation
*/
PikaMeta* pikaMeta_;
pthread_t workerId_, hbId_;
/*
* The accept thread, accept user request, and redirect conn to worker
* thread
*/
PikaWorker* pikaWorker_;
/*
* The heartbeat thread, accept other node heartbeat, and redirect to hbworker thred
*/
PikaHb* pikaHb_;
PikaHbMonitor* pikaHbMonitor_;
// No copying allowed
Pika(const Pika&);
void operator=(const Pika&);
};
#endif
...@@ -4,36 +4,22 @@ ...@@ -4,36 +4,22 @@
#include "stdlib.h" #include "stdlib.h"
#include "stdio.h" #include "stdio.h"
#include "xdebug.h" #include "xdebug.h"
#include "status.h"
#include "base_conf.h" #include "base_conf.h"
class PikaConf : public BaseConf class PikaConf : public BaseConf
{ {
public: public:
PikaConf(const char* path); PikaConf(const char* path);
/*
* The repetion return variable warpper
* remember to add the initial getConf* in the constructer
*/
int port() { return port_; } int port() { return port_; }
int thread_num() { return thread_num_; } int thread_num() { return thread_num_; }
char* log_path() { return log_path_; } char* log_path() { return log_path_; }
int log_level() { return log_level_; } int log_level() { return log_level_; }
int hb_port() { return hb_port_; }
char* seed() { return seed_; }
int seed_port() { return seed_port_; }
char* data_path() { return data_path_; }
private: private:
int port_; int port_;
int hb_port_;
int thread_num_; int thread_num_;
char log_path_[PIKA_WORD_SIZE]; char log_path_[PIKA_WORD_SIZE];
int log_level_; int log_level_;
char seed_[PIKA_WORD_SIZE];
int seed_port_;
char data_path_[PIKA_WORD_SIZE];
}; };
#endif #endif
...@@ -5,54 +5,43 @@ ...@@ -5,54 +5,43 @@
#include "csapp.h" #include "csapp.h"
#include "pika_thread.h" #include "pika_thread.h"
#include "pika_define.h" #include "pika_define.h"
#include "sds.h"
#include <list>
class PikaConn class PikaConn
{ {
public: public:
PikaConn(int fd); PikaConn(int fd);
PikaConn();
~PikaConn(); ~PikaConn();
/* /*
* Set the fd to nonblock && set the flag_ the the fd flag * Set the fd to nonblock && set the flag_ the the fd flag
*/ */
bool SetNonblock(); bool SetNonblock();
void InitPara();
Status PikaReadBuf(); Status PikaReadBuf();
void DriveMachine();
int PikaGetRequest(); int PikaGetRequest();
int PikaSendReply(); int PikaSendReply();
void set_fd(int fd) { fd_ = fd; };
int flags() { return flags_; };
private: int GetArgc() {return argv_.size(); };
void AddArgv(std::string a) { argv_.push_back(a); };
void Reset();
int ProcessInputBuffer();
int ProcessInlineBuffer();
int ProcessMultibulkBuffer();
int DoCmd();
private:
int fd_; int fd_;
std::list<std::string> argv_;
int flags_; int flags_;
sds rbuf_;
/*
* These functions parse the message from client
*/
Status PikaReadHeader(rio_t *rio);
Status PikaReadCode(rio_t *rio);
Status PikaReadPacket(rio_t *rio);
Status BuildObuf(int32_t opcode, const int packet_len);
/*
* The Variable need by read the buf,
* We allocate the memory when we start the server
*/
int header_len_;
int32_t r_opcode_;
char* rbuf_;
int32_t cur_pos_; int32_t cur_pos_;
int32_t rbuf_len_; int32_t rbuf_len_;
int req_type_;
int multibulklen_;
long bulklen_;
ConnStatus connStatus_; sds wbuf_;
char* wbuf_;
int32_t wbuf_len_; int32_t wbuf_len_;
int32_t wbuf_pos_; int32_t wbuf_pos_;
PikaThread *thread_; PikaThread *thread_;
......
...@@ -2,17 +2,14 @@ ...@@ -2,17 +2,14 @@
#define __PIKA_DEFINE_H__ #define __PIKA_DEFINE_H__
#define PIKA_MAX_CLIENTS 10240 #define PIKA_MAX_CLIENTS 10240
#define PIKA_MAX_MESSAGE 1024 #define PIKA_MAX_MESSAGE 10240
#define PIKA_THREAD_NUM 16 #define PIKA_THREAD_NUM 16
#define PIKA_HEARTBEAT_THREAD 1
#define HB_MAX_BUFFER 1024
#define PIKA_NAME_LEN 1024
/* /*
* The pb head and code length * Client Request type
*/ */
#define COMMAND_HEADER_LENGTH 4 #define REDIS_REQ_INLINE 1
#define COMMAND_CODE_LENGTH 4 #define REDIS_REQ_MULTIBULK 2
/* /*
* The socket block type * The socket block type
...@@ -28,53 +25,6 @@ enum EventStatus { ...@@ -28,53 +25,6 @@ enum EventStatus {
kWriteable = 2, kWriteable = 2,
}; };
enum ConnStatus {
kHeader = 0,
kCode = 1,
kPacket = 2,
kComplete = 3,
kBuildObuf = 4,
kWriteObuf = 5,
};
enum CommandCode {
kSdkInvalidOperation = 512,
kSdkSet = 513,
kSdkSetRet = 514,
kSdkDelete = 515,
kSdkDeleteRet = 516,
kSdkGet = 518,
kSdkGetRet = 519,
kSdkMGet = 526,
kSdkMGetRet = 527,
// for hash
kSdkHput = 530,
kSdkHputret = 531,
kSdkHget = 532,
kSdkHgetret = 533,
kSdkHdelete = 534,
kSdkHdeleteret = 535,
//for list
kSdkLSize = 556,
kSdkLSizeRet = 557,
kSdkLGet = 558,
kSdkLGetRet = 559,
kSdkLPut = 560,
kSdkLPutRet = 561,
kSdkLPop = 562,
kSdkLPopRet = 563,
kSdkLIndex = 564,
kSdkLIndexRet = 565,
kSdkLRange = 566,
kSdkLRangeRet = 567,
//for hb communicate
kHbSend = 601,
kHbSendRet = 602,
};
/* /*
* define the macro in pika_conf * define the macro in pika_conf
*/ */
......
#ifndef __PIKA_HB_H__
#define __PIKA_HB_H__
/*
*
* Pika heartbeat of module
* ==========================
*
* Overview:
* ---------
* The Pika heartbeat module just build connection between nodes and sent
* message between nodes. The Pika module will save the heartbeat result in srv
* The there will be pika_event thread to check whether the srv has change, if
* we find that srv has change, in the pika_event will make a change message to
* other node
*
*/
#include "status.h"
#include "csapp.h"
#include "pika_thread.h"
#include "pika_define.h"
#include "pika_epoll.h"
#include "pika_node.h"
#include "pthread.h"
#include <vector>
class HbContext;
class PikaConn;
class PikaNode;
class PikaHb
{
public:
PikaHb(std::vector<PikaNode>* cur);
~PikaHb();
/*
* run the main heartbeat process
*/
void RunProcess();
void CreatePulse();
/*
* Send the pulse to other every 3 second
*/
void StartPulse();
static void CreateHb(pthread_t* pid, PikaHb* pikaHb);
static void* StartHb(void* arg);
/*
* Connect to the adj node
*/
Status DoConnect(const char* adj_hostname, int adj_port, HbContext* hbContext);
pthread_t* thread_id() { return &thread_id_; }
private:
pthread_t thread_id_;
Status Pulse(HbContext*, const std::string &host, const int port);
std::vector<PikaNode>* cur_;
std::vector<PikaNode> getHosts_;
/*
* This debug function used to printf the node is srv or gethosts
*/
void DebugSrv();
PikaEpoll *pikaEpoll_;
/*
* Here we used auto poll to find the next hb thread,
* last_thread_ is the last hb thread
* even at the first time we only have one hb thread
*/
int last_thread_;
/*
* This is the thread that deal with heartbeat
*/
PikaThread *hbThread_[PIKA_HEARTBEAT_THREAD];
/*
* The server side of connect to other pika node
*/
std::vector<PikaConn *> hbConns_;
/*
* The client side of context to other pika node
*/
std::vector<HbContext *> hbContexts_;
/*
* The heartbeat servaddr and port information
* get the servaddr_ from the pika_worker
* get the port from config file
*/
int sockfd_;
int flags_;
int hb_port_;
struct sockaddr_in servaddr_;
bool isSeed_;
struct timeval timeout_;
/*
* Don't allow copy construct and copy assign construct
*/
PikaHb(const PikaHb&);
void operator=(const PikaHb&);
};
#endif
#ifndef PIKA_HB_MONITOR_H_
#define PIKA_HB_MONITOR_H_
/*
*
* Overview:
* ---------
*
* The event class used to deal with event that will change metadata
* There is many kinds of event
* 1. After heartbeat we find a connection is down
* 2.
*
*
*/
#include "pika_node.h"
#include <vector>
#include "pthread.h"
class PikaNode;
class PikaHbMonitor
{
public:
PikaHbMonitor(std::vector<PikaNode>* cur);
static void CreateHbMonitor(pthread_t* pid, PikaHbMonitor* pikaHbMonitor);
static void* StartHbMonitor(void* arg);
void RunProcess();
pthread_t* thread_id() { return &thread_id_;}
private:
pthread_t thread_id_;
std::vector<PikaNode>* cur_;
std::vector<PikaNode> pre;
};
#endif
#ifndef __PIKA_ITEM_H__ #ifndef __PIKA_ITEM_H__
#define __PIKA_ITEM_H__ #define __PIKA_ITEM_H__
#include "status.h"
#include "pika_define.h" #include "pika_define.h"
class PikaItem class PikaItem
......
#ifndef __PIKA_META_H__
#define __PIKA_META_H__
#include <vector>
class PikaNode;
class PikaMeta
{
public:
private:
std::vector<PikaNode> pNodes;
};
#endif
#ifndef PIKA_NODE_H_
#define PIKA_NODE_H_
#include <stdio.h>
#include <string>
class PikaNode
{
public:
PikaNode(std::string host, int port) :
host_(host),
port_(port)
{};
const std::string* host() { return &host_; }
const int port() { return port_; }
bool operator==(const PikaNode& rval);
private:
std::string host_;
int port_;
};
#endif
#ifndef __PIKA_PACKET_H__
#define __PIKA_PACKET_H__
#include "status.h"
#include <string>
#include "bada_sdk.pb.h"
/*
* sdkSet proto parse and sdkSetRet build
*/
Status SetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value);
void SetRetBuild(bool status, SdkSetRet *sdkSetRet);
/*
* sdkGet proto parse and sdkGetRet build
*/
Status GetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key);
void GetRetBuild(std::string &val, SdkGetRet *sdkGetRet);
/*
* hbSend proto parse and hbSendRet build
*/
Status HbSendParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *host, int &port);
void HbSendBuild(const std::string &host, const int port, HbSend* hbSend);
void HbSendRetBuild(bool status, HbSendRet *hbSendRet);
#endif
#ifndef PIKA_PAXOS_H_
#define PIKA_PAXOS_H_
#include "pthread.h"
class PikaPaxos
{
public:
private:
pthread_t therad_id_;
};
#endif
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include "status.h"
#include "csapp.h" #include "csapp.h"
#include "xdebug.h" #include "xdebug.h"
#include "pika_define.h" #include "pika_define.h"
#include "status.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
...@@ -23,13 +23,15 @@ class PikaThread; ...@@ -23,13 +23,15 @@ class PikaThread;
class PikaEpoll; class PikaEpoll;
class PikaConn; class PikaConn;
class PikaWorker class PikaServer
{ {
public: public:
PikaWorker(); PikaServer();
~PikaWorker(); ~PikaServer();
void Start(); void RunProcess();
static void* StartThread(void* arg);
private: private:
friend class PikaConn; friend class PikaConn;
...@@ -41,11 +43,6 @@ private: ...@@ -41,11 +43,6 @@ private:
int sockfd_; int sockfd_;
int flags_; int flags_;
int port_; int port_;
char host_[PIKA_NAME_LEN];
/*
* The listen socket address
*/
struct sockaddr_in servaddr_; struct sockaddr_in servaddr_;
/* /*
...@@ -70,9 +67,11 @@ private: ...@@ -70,9 +67,11 @@ private:
*/ */
PikaThread *pikaThread_[PIKA_THREAD_NUM]; PikaThread *pikaThread_[PIKA_THREAD_NUM];
// No copying allowed // No copying allowed
PikaWorker(const PikaWorker&); PikaServer(const PikaServer&);
void operator=(const PikaWorker&); void operator=(const PikaServer&);
}; };
......
...@@ -10,7 +10,6 @@ ...@@ -10,7 +10,6 @@
#include "port.h" #include "port.h"
#include "pika_item.h" #include "pika_item.h"
#include "status.h"
#include "csapp.h" #include "csapp.h"
...@@ -30,17 +29,13 @@ public: ...@@ -30,17 +29,13 @@ public:
int notify_receive_fd() { return notify_receive_fd_; } int notify_receive_fd() { return notify_receive_fd_; }
int notify_send_fd() { return notify_send_fd_; } int notify_send_fd() { return notify_send_fd_; }
static void CreateThread(pthread_t &pid, PikaThread* pikaThread);
static void* StartThread(void* arg);
pthread_t thread_id_; pthread_t thread_id_;
// port::Mutex mutex() { return mutex_; } // port::Mutex mutex() { return mutex_; }
private: private:
friend class PikaWorker; friend class PikaServer;
friend class PikaHb;
/* /*
* These two fd receive the notify from master thread * These two fd receive the notify from master thread
...@@ -58,7 +53,7 @@ private: ...@@ -58,7 +53,7 @@ private:
*/ */
PikaEpoll *pikaEpoll_; PikaEpoll *pikaEpoll_;
std::map<int, PikaConn*> conns_; std::map<int, PikaConn *> conns_;
port::Mutex mutex_; port::Mutex mutex_;
......
#ifndef __PIKA_UTIL_H__ #ifndef __PIKA_UTIL_H__
#define __PIKA_UTIL_H__ #define __PIKA_UTIL_H__
#include "pika_define.h"
int Setnonblocking(int sockfd); int Setnonblocking(int sockfd);
int SetBlockType(const int fd, int& flags, BlockType type);
int SafeClose(int fd);
#endif #endif
/* SDSLib, A C dynamic strings library
*
* Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __SDS_H
#define __SDS_H
#define SDS_MAX_PREALLOC (1024*1024)
#include <sys/types.h>
#include <stdarg.h>
typedef char *sds;
struct sdshdr {
unsigned int len;
unsigned int free;
char buf[];
};
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (sdshdr *)(s-(sizeof(struct sdshdr)));
return sh->len;
}
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (sdshdr *)(s-(sizeof(struct sdshdr)));
return sh->free;
}
sds sdsnewlen(const void *init, size_t initlen);
sds sdsnew(const char *init);
sds sdsempty(void);
size_t sdslen(const sds s);
sds sdsdup(const sds s);
void sdsfree(sds s);
size_t sdsavail(const sds s);
sds sdsgrowzero(sds s, size_t len);
sds sdscatlen(sds s, const void *t, size_t len);
sds sdscat(sds s, const char *t);
sds sdscatsds(sds s, const sds t);
sds sdscpylen(sds s, const char *t, size_t len);
sds sdscpy(sds s, const char *t);
sds sdscatvprintf(sds s, const char *fmt, va_list ap);
#ifdef __GNUC__
sds sdscatprintf(sds s, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
#else
sds sdscatprintf(sds s, const char *fmt, ...);
#endif
sds sdscatfmt(sds s, char const *fmt, ...);
sds sdstrim(sds s, const char *cset);
void sdsrange(sds s, int start, int end);
void sdsupdatelen(sds s);
void sdsclear(sds s);
int sdscmp(const sds s1, const sds s2);
sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count);
void sdsfreesplitres(sds *tokens, int count);
void sdstolower(sds s);
void sdstoupper(sds s);
sds sdsfromlonglong(long long value);
sds sdscatrepr(sds s, const char *p, size_t len);
sds *sdssplitargs(const char *line, int *argc);
sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen);
sds sdsjoin(char **argv, int argc, char *sep);
/* Low level functions exposed to the user API */
sds sdsMakeRoomFor(sds s, size_t addlen);
void sdsIncrLen(sds s, int incr);
sds sdsRemoveFreeSpace(sds s);
size_t sdsAllocSize(sds s);
#endif
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. #ifndef PIKA_INCLUDE_SLICE_H_
// Use of this source code is governed by a BSD-style license that can be #define PIKA_INCLUDE_SLICE_H_
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Slice is a simple structure containing a pointer into some external
// storage and a size. The user of a Slice must ensure that the slice
// is not used after the corresponding external storage has been
// deallocated.
//
// Multiple threads can invoke const methods on a Slice without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Slice must use
// external synchronization.
#ifndef __BADA_SLICE_H
#define __BADA_SLICE_H
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <string.h> #include <string.h>
#include <string> #include <string>
class Slice { class Slice {
public: public:
// Create an empty slice. // Create an empty slice.
Slice() : data_(""), size_(0) { } Slice() : data_(""), size_(0) { }
// Create a slice that refers to d[0,n-1]. // Create a slice that refers to d[0,n-1].
Slice(const char* d, size_t n) : data_(d), size_(n) { } Slice(const char* d, size_t n) : data_(d), size_(n) { }
// Create a slice that refers to the contents of "s" // Create a slice that refers to the contents of "s"
Slice(const std::string& s) : data_(s.data()), size_(s.size()) { } Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
// Create a slice that refers to s[0,strlen(s)-1] // Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) { } Slice(const char* s) : data_(s), size_(strlen(s)) { }
// Return a pointer to the beginning of the referenced data // Return a pointer to the beginning of the referenced data
const char* data() const { return data_; } const char* data() const { return data_; }
// Return the length (in bytes) of the referenced data // Return the length (in bytes) of the referenced data
size_t size() const { return size_; } size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero // Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; } bool empty() const { return size_ == 0; }
// Return the ith byte in the referenced data. // Return the ith byte in the referenced data.
// REQUIRES: n < size() // REQUIRES: n < size()
char operator[](size_t n) const { char operator[](size_t n) const {
assert(n < size()); assert(n < size());
return data_[n]; return data_[n];
} }
// Change this slice to refer to an empty array // Change this slice to refer to an empty array
void clear() { data_ = ""; size_ = 0; } void clear() { data_ = ""; size_ = 0; }
// Drop the first "n" bytes from this slice. // Drop the first "n" bytes from this slice.
void remove_prefix(size_t n) { void remove_prefix(size_t n) {
assert(n <= size()); assert(n <= size());
data_ += n; data_ += n;
size_ -= n; size_ -= n;
} }
// Return a string that contains the copy of the referenced data. // Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); } std::string ToString() const { return std::string(data_, size_); }
// Three-way comparison. Returns value: // Three-way comparison. Returns value:
// < 0 iff "*this" < "b", // < 0 iff "*this" < "b",
// == 0 iff "*this" == "b", // == 0 iff "*this" == "b",
// > 0 iff "*this" > "b" // > 0 iff "*this" > "b"
int compare(const Slice& b) const; int compare(const Slice& b) const;
// Return true iff "x" is a prefix of "*this" // Return true iff "x" is a prefix of "*this"
bool starts_with(const Slice& x) const { bool starts_with(const Slice& x) const {
return ((size_ >= x.size_) && return ((size_ >= x.size_) &&
(memcmp(data_, x.data_, x.size_) == 0)); (memcmp(data_, x.data_, x.size_) == 0));
} }
private: private:
const char* data_; const char* data_;
size_t size_; size_t size_;
// Intentionally copyable // Intentionally copyable
}; };
inline bool operator==(const Slice& x, const Slice& y) { inline bool operator==(const Slice& x, const Slice& y) {
return ((x.size() == y.size()) && return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0)); (memcmp(x.data(), y.data(), x.size()) == 0));
} }
inline bool operator!=(const Slice& x, const Slice& y) { inline bool operator!=(const Slice& x, const Slice& y) {
return !(x == y); return !(x == y);
} }
inline int Slice::compare(const Slice& b) const { inline int Slice::compare(const Slice& b) const {
const int min_len = (size_ < b.size_) ? size_ : b.size_; const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
int r = memcmp(data_, b.data_, min_len); int r = memcmp(data_, b.data_, min_len);
if (r == 0) { if (r == 0) {
if (size_ < b.size_) r = -1; if (size_ < b.size_) r = -1;
else if (size_ > b.size_) r = +1; else if (size_ > b.size_) r = +1;
} }
return r; return r;
} }
#endif
#endif // PIKA_INCLUDE_SLICE_H_
#ifndef __BADA_STATUS_H #ifndef PIKA_INCLUDE_STATUS_H_
#define __BADA_STATUS_H #define PIKA_INCLUDE_STATUS_H_
#include <string> #include <string>
#include "slice.h" #include "slice.h"
#include "stdint.h"
class Status { class Status {
public: public:
// Create a success status. // Create a success status.
Status() : state_(NULL) { } Status() : state_(NULL) { }
~Status() { delete[] state_; } ~Status() { delete[] state_; }
// Copy the specified status. // Copy the specified status.
Status(const Status& s); Status(const Status& s);
void operator=(const Status& s); void operator=(const Status& s);
// Return a success status. // Return a success status.
static Status OK() { return Status(); } static Status OK() { return Status(); }
// Return error status of an appropriate type. // Return error status of an appropriate type.
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) { static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, msg2); return Status(kNotFound, msg, msg2);
} }
static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) { static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kCorruption, msg, msg2); return Status(kCorruption, msg, msg2);
} }
static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice()) { static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotSupported, msg, msg2); return Status(kNotSupported, msg, msg2);
} }
static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice()) { static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kInvalidArgument, msg, msg2); return Status(kInvalidArgument, msg, msg2);
} }
static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, msg, msg2); return Status(kIOError, msg, msg2);
} }
static Status IOError(const std::string& context, int err_number) {
return IOError(context, strerror(err_number)); // Returns true iff the status indicates success.
} bool ok() const { return (state_ == NULL); }
static Status EndFile(const Slice& msg, const Slice& msg2 = Slice()) { // Returns true iff the status indicates a NotFound error.
return Status(kEndFile, msg, msg2); bool IsNotFound() const { return code() == kNotFound; }
}
// Returns true iff the status indicates a Corruption error.
// Returns true iff the status indicates success. bool IsCorruption() const { return code() == kCorruption; }
bool ok() const { return (state_ == NULL); }
// Returns true iff the status indicates an IOError.
// Returns true iff the status indicates a NotFound error. bool IsIOError() const { return code() == kIOError; }
bool IsNotFound() const { return code() == kNotFound; }
// Return a string representation of this status suitable for printing.
// Returns true iff the status indicates a Corruption error. // Returns the string "OK" for success.
bool IsCorruption() const { return code() == kCorruption; } std::string ToString() const;
// Returns true iff the status indicates an IOError. private:
bool IsIOError() const { return code() == kIOError; } // OK status has a NULL state_. Otherwise, state_ is a new[] array
// of the following form:
bool IsEndFile() const { return code() == kEndFile; } // state_[0..3] == length of message
// state_[4] == code
// Return a string representation of this status suitable for printing. // state_[5..] == message
// Returns the string "OK" for success. const char* state_;
std::string ToString() const;
enum Code {
private: kOk = 0,
// OK status has a NULL state_. Otherwise, state_ is a new[] array kNotFound = 1,
// of the following form: kCorruption = 2,
// state_[0..3] == length of message kNotSupported = 3,
// state_[4] == code kInvalidArgument = 4,
// state_[5..] == message kIOError = 5
const char* state_; };
enum Code { Code code() const {
kOk = 0, return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
kNotFound = 1, }
kCorruption = 2,
kNotSupported = 3, Status(Code code, const Slice& msg, const Slice& msg2);
kInvalidArgument = 4, static const char* CopyState(const char* s);
kIOError = 5,
kEndFile = 6
};
Code code() const {
return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
}
Status(Code code, const Slice& msg, const Slice& msg2);
static const char* CopyState(const char* s);
}; };
inline Status::Status(const Status& s) { inline Status::Status(const Status& s) {
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
} }
inline void Status::operator=(const Status& s) { inline void Status::operator=(const Status& s) {
// The following condition catches both aliasing (when this == &s), // The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok. // and the common case where both s and *this are ok.
if (state_ != s.state_) { if (state_ != s.state_) {
delete[] state_; delete[] state_;
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
} }
} }
#endif #endif // PIKA_INCLUDE_STATUS_H_
/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __REDIS_UTIL_H
#define __REDIS_UTIL_H
#include "sds.h"
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
int stringmatch(const char *p, const char *s, int nocase);
long long memtoll(const char *p, int *err);
int ll2string(char *s, size_t len, long long value);
int string2ll(const char *s, size_t slen, long long *value);
int string2l(const char *s, size_t slen, long *value);
int d2string(char *buf, size_t len, double value);
sds getAbsolutePath(char *filename);
int pathIsBaseName(char *path);
#endif
/* zmalloc - total amount of allocated memory aware version of malloc()
*
* Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __ZMALLOC_H
#define __ZMALLOC_H
/* Double expansion needed for stringification of macro values. */
#define __xstr(s) __str(s)
#define __str(s) #s
#if defined(USE_TCMALLOC)
#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))
#include <google/tcmalloc.h>
#if (TC_VERSION_MAJOR == 1 && TC_VERSION_MINOR >= 6) || (TC_VERSION_MAJOR > 1)
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) tc_malloc_size(p)
#else
#error "Newer version of tcmalloc required"
#endif
#elif defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#include <jemalloc/jemalloc.h>
#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) je_malloc_usable_size(p)
#else
#error "Newer version of jemalloc required"
#endif
#elif defined(__APPLE__)
#include <malloc/malloc.h>
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) malloc_size(p)
#endif
#ifndef ZMALLOC_LIB
#define ZMALLOC_LIB "libc"
#endif
void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
void zfree(void *ptr);
char *zstrdup(const char *s);
size_t zmalloc_used_memory(void);
void zmalloc_enable_thread_safeness(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
float zmalloc_get_fragmentation_ratio(size_t rss);
size_t zmalloc_get_rss(void);
size_t zmalloc_get_private_dirty(void);
size_t zmalloc_get_smap_bytes_by_field(char *field);
void zlibc_free(void *ptr);
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr);
#endif
#endif /* __ZMALLOC_H */
此差异已折叠。
此差异已折叠。
#include "hb_context.h"
#include "csapp.h"
#include "xdebug.h"
#include "pika_util.h"
#include "pika_define.h"
HbContext::HbContext()
{
fd_ = -1;
obuf_ = (char *)malloc(sizeof(char) * HB_MAX_BUFFER);
obuf_len_ = 0;
header_len_ = 0;
r_opcode_ = 0;
rbuf_len_ = 0;
rbuf_ = (char *)malloc(sizeof(char) * HB_MAX_BUFFER);
}
HbContext::~HbContext()
{
free(obuf_);
free(rbuf_);
SafeClose(fd_);
}
Status HbContext::SetBlockType(BlockType type)
{
Status s;
if ((flags_ = fcntl(fd_, F_GETFL, 0)) < 0) {
s = Status::Corruption("F_GETFEL error");
SafeClose(fd_);
return s;
}
if (type == kBlock) {
flags_ &= (~O_NONBLOCK);
} else if (type == kNonBlock) {
flags_ |= O_NONBLOCK;
}
if (fcntl(fd_, F_SETFL, flags_) < 0) {
s = Status::Corruption("F_SETFL error");
SafeClose(fd_);
return s;
}
return Status::OK();
}
Status HbContext::SetTcpNoDelay()
{
Status s;
int yes = 1;
if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) {
s = Status::Corruption("setsockopt(TCO_NODELAY) error");
SafeClose(fd_);
return s;
}
return s;
}
char* HbContext::GetContextBuffer(int *buf_len)
{
if(!buf_len){
return NULL;
}
int offset = COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH;
*buf_len = HB_MAX_BUFFER - offset;
return obuf_ + offset;
}
Status HbContext::BuildObuf(int32_t opcode, const int packet_len)
{
Status s;
//obuf_ = (char *)realloc(obuf_, sizeof(char) * (packet_len + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH));
uint32_t code_len = COMMAND_CODE_LENGTH + packet_len;
uint32_t u;
u = htonl(code_len);
memcpy(obuf_, &u, sizeof(uint32_t));
u = htonl(opcode);
memcpy(obuf_ + COMMAND_CODE_LENGTH, &u, sizeof(uint32_t));
//memcpy(obuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, packet, packet_len);
obuf_len_ = COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH + packet_len;
r_opcode_ = opcode;
return s;
}
Status HbContext::HbBufferWrite()
{
log_info("Hb obuf_len %d", obuf_len_);
Status s;
int32_t nwritten = 0;
nwritten = rio_writen(fd_, (void *)obuf_, obuf_len_);
if (nwritten == -1) {
if ((errno == EAGAIN && (flags_ & O_NONBLOCK)) || (errno == EINTR)) {
} else {
s = Status::IOError(obuf_, "write heartbeat context error");
return s;
}
}
return Status::OK();
}
Status HbContext::HbBufferRead()
{
Status s;
rio_t rio;
rio_readinitb(&rio, fd_);
s = HbBufferReadHeader(&rio);
if (!s.ok()) {
return s;
}
s = HbBufferReadCode(&rio);
if (!s.ok()) {
return s;
}
s = HbBufferReadPacket(&rio);
return s;
}
Status HbContext::HbBufferReadHeader(rio_t *rio)
{
Status s;
char buf[1024];
int32_t integer = 0;
ssize_t nread;
while (1) {
nread = rio_readnb(rio, buf, COMMAND_HEADER_LENGTH);
if (nread == -1) {
if ((errno == EAGAIN && (flags_ & O_NONBLOCK)) || (errno == EINTR)) {
continue;
} else {
s = Status::IOError("read command header error");
return s;
}
} else if (nread == 0){
return Status::Corruption("Connect has interrupt");
} else {
break;
}
}
memcpy((char *)(&integer), buf, sizeof(int32_t));
header_len_ = ntohl(integer);
return Status::OK();
}
Status HbContext::HbBufferReadCode(rio_t *rio)
{
Status s;
char buf[1024];
int32_t integer = 0;
ssize_t nread = 0;
while (1) {
nread = rio_readnb(rio, buf, COMMAND_CODE_LENGTH);
if (nread == -1) {
if ((errno == EAGAIN && (flags_ & O_NONBLOCK)) || (errno == EINTR)) {
continue;
} else {
s = Status::IOError("read command code error");
return s;
}
} else if (nread == 0){
return Status::Corruption("Connect has interrupt");
} else {
break;
}
}
memcpy((char *)(&integer), buf, sizeof(int32_t));
r_opcode_ = ntohl(integer);
return Status::OK();
}
Status HbContext::HbBufferReadPacket(rio_t *rio)
{
Status s;
//char buf[MAX_PACKAGE_LEN];
int nread = 0;
while (1) {
nread = rio_readnb(rio, (void*)rbuf_, header_len_ - 4);
if (nread == -1) {
if ((errno == EAGAIN && (flags_ & O_NONBLOCK)) || (errno == EINTR)) {
continue;
} else {
s = Status::IOError("read data error");
return s;
}
} else if (nread == 0){
return Status::Corruption("Connect has interrupt");
} else {
break;
}
}
// todo to check the length of rbuf_
//rbuf_ = (char *)realloc(rbuf_, sizeof(char) * nread);
//memcpy(rbuf_, buf, nread);
rbuf_len_ = nread;
return s;
}
#include "pika.h"
#include "xdebug.h"
#include <glog/logging.h>
#include "pika_conf.h"
#include "pika_hb.h"
#include <iostream>
#include <signal.h>
Pika *gPika;
PikaConf *gPikaConf;
static void pika_glog_init()
{
FLAGS_log_dir = gPikaConf->log_path();
::google::InitGoogleLogging("pika");
FLAGS_minloglevel = gPikaConf->log_level();
LOG(WARNING) << "Pika glog init";
/*
* dump some usefull message when crash on certain signals
*/
// google::InstallFailureSignalHandler();
}
static void sig_handler(const int sig)
{
printf("Caught signal %d", sig);
}
void pika_signal_setup()
{
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
LOG(WARNING) << "pika signal setup ok";
}
static void version()
{
printf("-----------Pika server 1.0.0----------\n");
}
void pika_init_conf(const char* path)
{
gPikaConf = new PikaConf(path);
if (gPikaConf == NULL) {
LOG(FATAL) << "pika load conf error";
}
version();
printf("-----------Pika config list----------\n");
gPikaConf->DumpConf();
printf("-----------Pika config end----------\n");
}
static void usage()
{
fprintf(stderr,
"Pika module 1.0.0\n"
"need One parameters\n"
"-D the conf path \n"
"-h show this usage message \n"
"example: ./bin/pika -D./conf/pika.conf\n"
);
}
int main(int argc, char **argv)
{
bool pathOpt = false;
char c;
char path[PIKA_LINE_SIZE];
if (argc != 2) {
usage();
return -1;
}
while (-1 != (c = getopt(argc, argv, "D:h"))) {
switch (c) {
case 'D':
snprintf(path, PIKA_LINE_SIZE, "%s", optarg);
pathOpt = 1;
break;
case 'h':
usage();
return 0;
default:
usage();
return 0;
}
}
/*
* check wether set the conf path
*/
if (pathOpt == false) {
LOG(FATAL) << "Don't get the conf path";
}
/*
* init the conf
*/
pika_init_conf(path);
/*
* init the glog config
*/
pika_glog_init();
/*
* set up the signal
*/
pika_signal_setup();
/*
* Init the pika
* Inside the pika, we have pikaWorker, pikaHb
*/
gPika = new Pika();
if (gPika != NULL) {
LOG(WARNING) << "Pika Server init ok";
} else {
LOG(FATAL) << "Pika Server init error";
}
/*
* Before now, all we do is initial all server
* Then we will start our server in order
*/
log_info("Start running heartbeat");
/*
* Start the heartbeat module
*/
gPika->RunHb();
/*
* Start the pulse thread
*/
gPika->RunPulse();
gPika->RunHbMonitor();
/*
* Start main worker, the main server must be the last one to start
* because we must prepare and check everything before we can serve
*/
LOG(WARNING) << "Pika Server going to start";
log_info("Start running pika main server");
gPika->RunWorker();
return 0;
}
#include "pika.h" #include "pika_server.h"
#include "pika_worker.h" #include "xdebug.h"
#include "pika_hb.h" #include <glog/logging.h>
#include "pika_meta.h" #include "pika_conf.h"
#include "pika_hb_monitor.h"
Pika::Pika()
{
pikaMeta_ = new PikaMeta();
pikaWorker_ = new PikaWorker();
pikaHb_ = new PikaHb(&preNode_);
pikaHbMonitor_ = new PikaHbMonitor(&preNode_);
}
Pika::~Pika() #include <iostream>
#include <signal.h>
PikaConf *g_pikaConf;
PikaServer *g_pikaServer;
static void pika_glog_init()
{ {
delete pikaHbMonitor_; FLAGS_log_dir = g_pikaConf->log_path();
delete pikaHb_; ::google::InitGoogleLogging("pika");
delete pikaWorker_; FLAGS_minloglevel = g_pikaConf->log_level();
delete pikaMeta_; LOG(WARNING) << "Pika glog init";
/*
* dump some usefull message when crash on certain signals
*/
// google::InstallFailureSignalHandler();
} }
int Pika::RunHb() static void sig_handler(const int sig)
{ {
PikaHb::CreateHb(pikaHb_->thread_id(), pikaHb_); printf("Caught signal %d", sig);
return 0;
} }
int Pika::RunHbMonitor() void pika_signal_setup()
{ {
PikaHbMonitor::CreateHbMonitor(pikaHbMonitor_->thread_id(), pikaHbMonitor_); signal(SIGHUP, SIG_IGN);
return 0; signal(SIGPIPE, SIG_IGN);
LOG(WARNING) << "pika signal setup ok";
} }
int Pika::RunWorker()
static void version()
{ {
pikaWorker_->Start(); printf("-----------Pika server 1.0.0----------\n");
/*
* pthread_create(&workerId_, NULL, &(Pika::StartHb), pikaWorker_);
*/
return 0;
} }
void pika_init_conf(const char* path)
void* Pika::StartWorker(void* arg)
{ {
reinterpret_cast<PikaWorker*>(arg)->Start(); g_pikaConf = new PikaConf(path);
return NULL; if (g_pikaConf == NULL) {
LOG(FATAL) << "pika load conf error";
}
version();
printf("-----------Pika config list----------\n");
g_pikaConf->DumpConf();
printf("-----------Pika config end----------\n");
} }
int Pika::RunPulse()
static void usage()
{ {
pthread_create(&hbId_, NULL, &(Pika::StartPulse), pikaHb_); fprintf(stderr,
"Pika module 1.0.0\n"
"need One parameters\n"
"-D the conf path \n"
"-h show this usage message \n"
"example: ./bin/pika -D./conf/pika.conf\n"
);
} }
void* Pika::StartPulse(void* arg) int main(int argc, char **argv)
{ {
reinterpret_cast<PikaHb*>(arg)->StartPulse(); bool path_opt = false;
return NULL; char c;
char path[PIKA_LINE_SIZE];
if (argc != 2) {
usage();
return -1;
}
while (-1 != (c = getopt(argc, argv, "D:h"))) {
switch (c) {
case 'D':
snprintf(path, PIKA_LINE_SIZE, "%s", optarg);
path_opt = 1;
break;
case 'h':
usage();
return 0;
default:
usage();
return 0;
}
}
/*
* check wether set the conf path
*/
if (path_opt == false) {
LOG(FATAL) << "Don't get the conf path";
}
/*
* init the conf
*/
pika_init_conf(path);
/*
* init the glog config
*/
pika_glog_init();
/*
* set up the signal
*/
pika_signal_setup();
/*
* Init the server
*/
g_pikaServer = new PikaServer();
if (g_pikaServer != NULL) {
LOG(WARNING) << "Pika Server init ok";
} else {
LOG(FATAL) << "Pika Server init error";
}
LOG(WARNING) << "Pika Server going to start";
g_pikaServer->RunProcess();
return 0;
} }
...@@ -10,8 +10,4 @@ PikaConf::PikaConf(const char* path) : ...@@ -10,8 +10,4 @@ PikaConf::PikaConf(const char* path) :
getConfInt("thread_num", &thread_num_); getConfInt("thread_num", &thread_num_);
getConfStr("log_path", log_path_); getConfStr("log_path", log_path_);
getConfInt("log_level", &log_level_); getConfInt("log_level", &log_level_);
getConfInt("hb_port", &hb_port_);
getConfStr("seed", seed_);
getConfInt("seed_port", &seed_port_);
getConfStr("data_path", data_path_);
} }
此差异已折叠。
...@@ -25,12 +25,10 @@ Status PikaEpoll::PikaAddEvent(int fd, int mask) ...@@ -25,12 +25,10 @@ Status PikaEpoll::PikaAddEvent(int fd, int mask)
{ {
struct epoll_event ee; struct epoll_event ee;
ee.data.fd = fd; ee.data.fd = fd;
/* // log_info("PikaAddEvent mask %d", mask);
* log_info("PikaAddEvent mask %d", mask);
*/
ee.events = mask; ee.events = mask;
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ee) == -1) { if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ee) == -1) {
log_info("Epoll add error"); // log_info("Epoll add error");
return Status::Corruption("epollAdd error"); return Status::Corruption("epollAdd error");
} }
return Status::OK(); return Status::OK();
...@@ -39,15 +37,13 @@ Status PikaEpoll::PikaAddEvent(int fd, int mask) ...@@ -39,15 +37,13 @@ Status PikaEpoll::PikaAddEvent(int fd, int mask)
Status PikaEpoll::PikaModEvent(int fd, int oMask, int mask) Status PikaEpoll::PikaModEvent(int fd, int oMask, int mask)
{ {
/* // log_info("PikaModEvent mask %d %d", fd, (oMask | mask));
* log_info("PikaModEvent mask %d %d", fd, (oMask | mask));
*/
struct epoll_event ee; struct epoll_event ee;
ee.data.u64 = 0; ee.data.u64 = 0;
ee.data.fd = fd; ee.data.fd = fd;
ee.events = (oMask | mask); ee.events = (oMask | mask);
if (epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ee) == -1) { if (epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ee) == -1) {
log_info("Epoll ctl error"); // log_info("Epoll ctl error");
return Status::Corruption("epollCtl error"); return Status::Corruption("epollCtl error");
} }
return Status::OK(); return Status::OK();
...@@ -72,9 +68,7 @@ int PikaEpoll::PikaPoll() ...@@ -72,9 +68,7 @@ int PikaEpoll::PikaPoll()
for (int i = 0; i < numevents; i++) { for (int i = 0; i < numevents; i++) {
int mask = 0; int mask = 0;
firedevent_[i].fd_ = (events_ + i)->data.fd; firedevent_[i].fd_ = (events_ + i)->data.fd;
/* // log_info("events + i events %d", (events_ + i)->events);
* log_info("events + i events %d", (events_ + i)->events);
*/
if ((events_ + i)->events & EPOLLIN) { if ((events_ + i)->events & EPOLLIN) {
mask |= EPOLLIN; mask |= EPOLLIN;
} }
......
#include "pika_hb.h"
#include "pika_conf.h"
#include "status.h"
#include "pika_util.h"
#include "pika_thread.h"
#include "pika_item.h"
#include "pika_worker.h"
#include "mutexlock.h"
#include <glog/logging.h>
#include "pika_conn.h"
#include "hb_context.h"
#include "bada_sdk.pb.h"
#include "pika_packet.h"
#include <poll.h>
#include <vector>
extern PikaConf *gPikaConf;
PikaHb::PikaHb(std::vector<PikaNode>* cur) :
cur_(cur)
{
thread_id_ = pthread_self();
// init sock
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
memset(&servaddr_, 0, sizeof(servaddr_));
hb_port_ = gPikaConf->hb_port();
servaddr_.sin_family = AF_INET;
servaddr_.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr_.sin_port = htons(hb_port_);
char* tmp = gPikaConf->seed();
int sp = gPikaConf->seed_port();
log_info("seed%sseed %d", tmp, sp);
if (tmp[0] != '\0') {
log_info("seed%sseed %d", tmp, sp);
cur_->push_back(PikaNode(std::string(tmp), sp));
DebugSrv();
}
bind(sockfd_, (struct sockaddr *) &servaddr_, sizeof(servaddr_));
listen(sockfd_, 10);
timeout_.tv_sec = 1;
timeout_.tv_usec = 500000;
SetBlockType(sockfd_, flags_, kNonBlock);
/*
* inital the pikaepoll object, add the notify_receive_fd to epoll
*/
pikaEpoll_ = new PikaEpoll();
pikaEpoll_->PikaAddEvent(sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP);
last_thread_ = 0;
for (int i = 0; i < PIKA_HEARTBEAT_THREAD; i++) {
hbThread_[i] = new PikaThread();
}
// start the hbThread thread
for (int i = 0; i < PIKA_HEARTBEAT_THREAD; i++) {
pthread_create(&(hbThread_[i]->thread_id_), NULL, &(PikaThread::StartThread), hbThread_[i]);
}
CreatePulse();
}
PikaHb::~PikaHb()
{
}
void PikaHb::CreateHb(pthread_t* pid, PikaHb* pikaHb)
{
pthread_create(pid, NULL, &(PikaHb::StartHb), pikaHb);
}
void* PikaHb::StartHb(void* arg)
{
reinterpret_cast<PikaHb*>(arg)->RunProcess();
return NULL;
}
void PikaHb::RunProcess()
{
int nfds;
PikaFiredEvent *tfe;
Status s;
struct sockaddr_in cliaddr;
socklen_t clilen;
int fd, connfd;
for (;;) {
nfds = pikaEpoll_->PikaPoll();
tfe = pikaEpoll_->firedevent();
for (int i = 0; i < nfds; i++) {
fd = (tfe + i)->fd_;
if (fd == sockfd_ && ((tfe + i)->mask_ & EPOLLIN)) {
connfd = accept(sockfd_, (struct sockaddr *) &cliaddr, &clilen);
log_info("Accept new fd %d", connfd);
std::queue<PikaItem> *q = &(hbThread_[last_thread_]->conn_queue_);
log_info("Tfe must happen");
PikaItem ti(connfd);
{
MutexLock l(&hbThread_[last_thread_]->mutex_);
q->push(ti);
}
write(hbThread_[last_thread_]->notify_send_fd(), "", 1);
last_thread_++;
last_thread_ %= PIKA_THREAD_NUM;
} else if ((tfe + i)->mask_ & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
LOG(WARNING) << "Epoll timeout event";
}
}
}
}
Status PikaHb::DoConnect(const char* adj_hostname, int adj_port, HbContext* hbContext)
{
Status s;
int sockfd, rv;
char port[6];
struct addrinfo hints, *servinfo, *p;
snprintf(port, 6, "%d", adj_port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if ((rv = getaddrinfo(adj_hostname, port, &hints, &servinfo)) != 0) {
hints.ai_family = AF_INET6;
if ((rv = getaddrinfo(adj_hostname, port, &hints, &servinfo)) != 0) {
s = Status::IOError("tcp_connect error for ", adj_hostname);
return s;
}
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
continue;
}
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH) {
close(sockfd);
continue;
} else if (errno == EINPROGRESS && (hbContext->flags() & O_NONBLOCK)) {
/**
* This branch mean the fd is NONBLOCK, so EINPROGRESS is true
*/
} else {
if (errno == EINPROGRESS) {
struct pollfd wfd[1];
long msec = (timeout_.tv_sec * 1000) + ((timeout_.tv_usec + 999) / 1000);
wfd[0].fd = sockfd;
wfd[0].events = POLLOUT;
int res;
if ((res = poll(wfd, 1, msec)) == -1) {
close(sockfd);
freeaddrinfo(servinfo);
return Status::IOError("EHOSTUNREACH", "The target host cannot be reached");
} else if (res == 0) {
errno = ETIMEDOUT;
close(sockfd);
freeaddrinfo(servinfo);
return Status::IOError("ETIMEDOUT", "The target host connect timeout");
}
int err = 0;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
freeaddrinfo(servinfo);
return Status::IOError("EHOSTUNREACH", "The target host cannot be reached");
}
if (err) {
errno = err;
freeaddrinfo(servinfo);
return Status::IOError("EHOSTUNREACH", "The target host cannot be reached");
}
}
}
}
flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK);
s = Status::OK();
hbContext->set_fd(sockfd);
hbContext->SetBlockType(kBlock);
hbContext->SetTcpNoDelay();
freeaddrinfo(servinfo);
return s;
}
if (p == NULL) {
s = Status::IOError(strerror(errno), "Can't create socket ");
return s;
}
freeaddrinfo(servinfo);
freeaddrinfo(p);
}
void PikaHb::CreatePulse()
{
Status s;
std::vector<PikaNode>::iterator iter;
for (iter = cur_->begin(); iter != cur_->end(); iter++) {
HbContext *tmp = new HbContext();
log_info("%s %d", (*iter).host()->c_str(), (*iter).port());
s = DoConnect((*iter).host()->c_str(), (*iter).port(), tmp);
log_info("Status %s", s.ToString().c_str());
if (s.ok()) {
hbContexts_.push_back(tmp);
}
}
}
/*
* void PikaHb::DestroyPulse()
* {
* }
*/
Status PikaHb::Pulse(HbContext* hbContext, const std::string &host, const int port)
{
Status s;
HbSend hbSend;
HbSendBuild(host, port, &hbSend);
int buf_len = 0;
char* buf = hbContext->GetContextBuffer(&buf_len);
log_info("hbContext fd %d", hbContext->fd());
if (!buf) {
s = Status::InvalidArgument("context no buffer");
return s;
}
bool ret = hbSend.SerializeToArray(buf, buf_len);
if (!ret) {
s = Status::InvalidArgument("pb serialize error");
}
hbContext->BuildObuf(kHbSend, hbSend.ByteSize());
s = hbContext->HbBufferWrite();
return s;
}
void PikaHb::StartPulse()
{
Status s;
std::vector<HbContext *>::iterator iter;
while (1) {
log_info("%d", hbContexts_.size());
for (iter = hbContexts_.begin(); iter != hbContexts_.end(); iter++) {
s = Pulse(reinterpret_cast<HbContext*>(*iter), "127.0.0.1", 9876);
/*
* if (s.ok() != 0) {
* hbContexts_.erase(iter);
* }
*/
}
DebugSrv();
sleep(3);
}
}
void PikaHb::DebugSrv()
{
log_info("-----------DebugSrv----------\n");
std::vector<PikaNode>::iterator iter;
for (iter = cur_->begin(); iter != cur_->end(); iter++) {
log_info("host %s port %d", iter->host()->c_str(), iter->port());
}
}
#include "pika_hb_monitor.h"
#include "xdebug.h"
#include <algorithm>
PikaHbMonitor::PikaHbMonitor(std::vector<PikaNode>* cur) :
cur_(cur)
{
thread_id_ = pthread_self();
pre.clear();
}
void PikaHbMonitor::CreateHbMonitor(pthread_t* pid, PikaHbMonitor* pikaHbMonitor)
{
pthread_create(pid, NULL, &(PikaHbMonitor::StartHbMonitor), pikaHbMonitor);
return;
}
void* PikaHbMonitor::StartHbMonitor(void* arg)
{
reinterpret_cast<PikaHbMonitor*>(arg)->RunProcess();
return NULL;
}
void PikaHbMonitor::RunProcess()
{
std::vector<PikaNode>::iterator iter;
std::vector<PikaNode>::iterator preIter;
while (1) {
for (iter = cur_->begin(); iter != cur_->end(); iter++) {
log_info("compare pre cur host %s port %d", (*iter).host()->c_str(), (*iter).port());
if (std::find(pre.begin(), pre.end(), *iter) == cur_->end()) {
/*
* This node can't found in the pre node list, this mean the node havn't
* added into the metadata before. so we need add to the
* metadata
*/
}
}
for (preIter = pre.begin(); preIter != pre.end(); preIter++) {
log_info("compare cur cur host %s port %d", (*preIter).host()->c_str(), (*preIter).port());
/*
* if we find this node in the cur node list, this mean we have add this node to the metadata
*/
if (std::find(cur_->begin(), cur_->end(), *preIter) == pre.end()) {
/*
* This node can't found in the cur_ node list, this mean the node is
* disconnected, so we need delete the node from the metadata
*/
}
}
sleep(3);
}
}
#include "pika_node.h"
bool PikaNode::operator==(const PikaNode& rval)
{
if ((*(rval.host())) == this->host_ && rval.port() == this->port_) {
return true;
}
return false;
}
#include "pika_packet.h"
#include "pika_define.h"
Status SetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value)
{
Status s;
if (opcode == kSdkInvalidOperation) {
SdkInvalidOperation sdkInvalidOperation;
if (sdkInvalidOperation.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse invalid operation error");
return s;
}
if (sdkInvalidOperation.what() == 1003) {
return Status::NotFound("Can't not found the key");
}
return Status::InvalidArgument("Invalid operation");
} else {
SdkSet sdkSet;
if (!sdkSet.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse error");
return s;
}
if (sdkSet.opcode() == kSdkSet) {
key->assign(sdkSet.key().data(), sdkSet.key().size());
value->assign(sdkSet.value().data(), sdkSet.value().size());
} else {
s = Status::IOError("Set error");
}
}
return s;
}
void SetRetBuild(bool status, SdkSetRet *sdkSetRet)
{
sdkSetRet->set_opcode(kSdkSetRet);
sdkSetRet->set_status(status);
}
Status GetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key)
{
Status s;
if (opcode == kSdkInvalidOperation) {
SdkInvalidOperation sdkInvalidOperation;
if (sdkInvalidOperation.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse invalid operation error");
return s;
}
if (sdkInvalidOperation.what() == 1003) {
return Status::NotFound("Can't not found the key");
}
return Status::InvalidArgument("Invalid operation");
} else {
SdkGet sdkGet;
if (!sdkGet.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse error");
return s;
}
if (sdkGet.opcode() == kSdkGet) {
key->assign(sdkGet.key().data(), sdkGet.key().size());
} else {
s = Status::IOError("Get error");
}
}
return s;
}
void GetRetBuild(std::string &value, SdkGetRet *sdkGetRet)
{
sdkGetRet->set_opcode(kSdkGetRet);
sdkGetRet->set_value(value);
}
Status HbSendParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *host, int &port)
{
Status s;
if (opcode == kSdkInvalidOperation) {
SdkInvalidOperation sdkInvalidOperation;
if (sdkInvalidOperation.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse invalid operation error");
return s;
}
if (sdkInvalidOperation.what() == 1003) {
return Status::NotFound("Can't not found the key");
}
return Status::InvalidArgument("Invalid operation");
} else {
HbSend hbSend;
if (!hbSend.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse error");
return s;
}
if (hbSend.opcode() == kHbSend) {
host->assign(hbSend.host().data(), hbSend.host().size());
port = hbSend.port();
} else {
s = Status::IOError("Get error");
}
}
return s;
}
void HbSendBuild(const std::string &host, const int port, HbSend* hbSend)
{
hbSend->set_opcode(kHbSend);
hbSend->set_host(host);
hbSend->set_port(port);
}
void HbSendRetBuild(bool status, HbSendRet *hbSendRet)
{
hbSendRet->set_opcode(kHbSendRet);
hbSendRet->set_status(status);
}
#include "pika_paxos.h"
#include "pika_worker.h"
#include "pika_define.h" #include "pika_define.h"
#include "pika_util.h" #include "pika_util.h"
#include "pika_epoll.h" #include "pika_epoll.h"
...@@ -6,12 +5,12 @@ ...@@ -6,12 +5,12 @@
#include "pika_thread.h" #include "pika_thread.h"
#include "pika_conf.h" #include "pika_conf.h"
#include "mutexlock.h" #include "mutexlock.h"
#include "pika_server.h"
#include <glog/logging.h> #include <glog/logging.h>
#include "status.h"
extern PikaConf *gPikaConf; extern PikaConf *g_pikaConf;
Status PikaWorker::SetBlockType(BlockType type) Status PikaServer::SetBlockType(BlockType type)
{ {
Status s; Status s;
if ((flags_ = fcntl(sockfd_, F_GETFL, 0)) < 0) { if ((flags_ = fcntl(sockfd_, F_GETFL, 0)) < 0) {
...@@ -32,62 +31,64 @@ Status PikaWorker::SetBlockType(BlockType type) ...@@ -32,62 +31,64 @@ Status PikaWorker::SetBlockType(BlockType type)
return Status::OK(); return Status::OK();
} }
PikaWorker::PikaWorker() PikaServer::PikaServer()
{ {
// init sock // init sock
sockfd_ = socket(AF_INET, SOCK_STREAM, 0); sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
memset(&servaddr_, 0, sizeof(servaddr_)); memset(&servaddr_, 0, sizeof(servaddr_));
/* port_ = g_pikaConf->port();
* The usual bind, listen process
*/
port_ = gPikaConf->port();
servaddr_.sin_family = AF_INET; servaddr_.sin_family = AF_INET;
servaddr_.sin_addr.s_addr = htonl(INADDR_ANY); servaddr_.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr_.sin_port = htons(port_); servaddr_.sin_port = htons(port_);
bind(sockfd_, (struct sockaddr *) &servaddr_, sizeof(servaddr_)); bind(sockfd_, (struct sockaddr *) &servaddr_, sizeof(servaddr_));
listen(sockfd_, 10); listen(sockfd_, 10);
SetBlockType(kNonBlock); SetBlockType(kNonBlock);
// init pika epoll // init pika epoll
pikaEpoll_ = new PikaEpoll(); pikaEpoll_ = new PikaEpoll();
pikaEpoll_->PikaAddEvent(sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP); pikaEpoll_->PikaAddEvent(sockfd_, EPOLLIN | EPOLLERR | EPOLLHUP);
last_thread_ = 0; last_thread_ = 0;
for (int i = 0; i < gPikaConf->thread_num(); i++) { for (int i = 0; i < g_pikaConf->thread_num(); i++) {
pikaThread_[i] = new PikaThread(); pikaThread_[i] = new PikaThread();
} }
options_.create_if_missing = true; options_.create_if_missing = true;
leveldb::Status s = leveldb::DB::Open(options_, gPikaConf->data_path(), &db_); options_.write_buffer_size = 1500000000;
// leveldb::Status s = leveldb::DB::Open(options_, "/tmp/testdb", &db_);
leveldb::Status s = leveldb::DB::Open(options_, "/tmp/testdb", &db_);
if (!s.ok()) { if (!s.ok()) {
log_err("Open db failed"); log_err("Open db failed");
} }
// start the pikaThread_ thread // start the pikaThread_ thread
for (int i = 0; i < gPikaConf->thread_num(); i++) { for (int i = 0; i < g_pikaConf->thread_num(); i++) {
PikaThread::CreateThread(pikaThread_[i]->thread_id_, pikaThread_[i]); pthread_create(&(pikaThread_[i]->thread_id_), NULL, &(PikaServer::StartThread), pikaThread_[i]);
/*
* pthread_create(&(pikaThread_[i]->thread_id_), NULL, &(PikaThread::StartThread), pikaThread_[i]);
*/
} }
} }
PikaWorker::~PikaWorker() PikaServer::~PikaServer()
{ {
for (int i = 0; i < PIKA_THREAD_NUM; i++) { for (int i = 0; i < g_pikaConf->thread_num(); i++) {
delete(pikaThread_[i]); delete(pikaThread_[i]);
} }
delete(pikaEpoll_); delete(pikaEpoll_);
close(sockfd_); close(sockfd_);
} }
void* PikaServer::StartThread(void* arg)
{
reinterpret_cast<PikaThread*>(arg)->RunProcess();
return NULL;
}
void PikaWorker::Start() void PikaServer::RunProcess()
{ {
int nfds; int nfds;
PikaFiredEvent *tfe; PikaFiredEvent *tfe;
...@@ -112,7 +113,7 @@ void PikaWorker::Start() ...@@ -112,7 +113,7 @@ void PikaWorker::Start()
} }
write(pikaThread_[last_thread_]->notify_send_fd(), "", 1); write(pikaThread_[last_thread_]->notify_send_fd(), "", 1);
last_thread_++; last_thread_++;
last_thread_ %= PIKA_THREAD_NUM; last_thread_ %= g_pikaConf->thread_num();
} else if ((tfe + i)->mask_ & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { } else if ((tfe + i)->mask_ & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
LOG(WARNING) << "Epoll timeout event"; LOG(WARNING) << "Epoll timeout event";
} }
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "status.h" #include "status.h"
#include "pika_conn.h" #include "pika_conn.h"
extern PikaConf* gPikaConf; extern PikaConf* g_pikaConf;
PikaThread::PikaThread() PikaThread::PikaThread()
{ {
...@@ -37,29 +37,8 @@ PikaThread::~PikaThread() ...@@ -37,29 +37,8 @@ PikaThread::~PikaThread()
close(notify_receive_fd_); close(notify_receive_fd_);
} }
void PikaThread::CreateThread(pthread_t &pid, PikaThread* pikaThread)
{
pthread_create(&pid, NULL, &(PikaThread::StartThread), pikaThread);
return ;
}
void* PikaThread::StartThread(void* arg)
{
reinterpret_cast<PikaThread*>(arg)->RunProcess();
return NULL;
}
void PikaThread::RunProcess() void PikaThread::RunProcess()
{ {
/*
* These parameters used to get peer host and port
*/
struct sockaddr_in peer;
socklen_t pLen = sizeof(peer);
char buff[PIKA_NAME_LEN];
struct sockaddr_in servaddr_;
thread_id_ = pthread_self(); thread_id_ = pthread_self();
int nfds; int nfds;
PikaFiredEvent *tfe = NULL; PikaFiredEvent *tfe = NULL;
...@@ -68,12 +47,10 @@ void PikaThread::RunProcess() ...@@ -68,12 +47,10 @@ void PikaThread::RunProcess()
PikaConn *inConn; PikaConn *inConn;
for (;;) { for (;;) {
nfds = pikaEpoll_->PikaPoll(); nfds = pikaEpoll_->PikaPoll();
/* // log_info("nfds %d", nfds);
* log_info("nfds %d", nfds);
*/
for (int i = 0; i < nfds; i++) { for (int i = 0; i < nfds; i++) {
tfe = (pikaEpoll_->firedevent()) + i; tfe = (pikaEpoll_->firedevent()) + i;
log_info("tfe->fd_ %d tfe->mask_ %d", tfe->fd_, tfe->mask_); // log_info("tfe->fd_ %d tfe->mask_ %d", tfe->fd_, tfe->mask_);
if (tfe->fd_ == notify_receive_fd_ && (tfe->mask_ & EPOLLIN)) { if (tfe->fd_ == notify_receive_fd_ && (tfe->mask_ & EPOLLIN)) {
read(notify_receive_fd_, bb, 1); read(notify_receive_fd_, bb, 1);
{ {
...@@ -86,7 +63,7 @@ void PikaThread::RunProcess() ...@@ -86,7 +63,7 @@ void PikaThread::RunProcess()
conns_[ti.fd()] = tc; conns_[ti.fd()] = tc;
pikaEpoll_->PikaAddEvent(ti.fd(), EPOLLIN); pikaEpoll_->PikaAddEvent(ti.fd(), EPOLLIN);
log_info("receive one fd %d", ti.fd()); // log_info("receive one fd %d", ti.fd());
/* /*
* tc->set_thread(this); * tc->set_thread(this);
*/ */
...@@ -94,42 +71,40 @@ void PikaThread::RunProcess() ...@@ -94,42 +71,40 @@ void PikaThread::RunProcess()
int shouldClose = 0; int shouldClose = 0;
if (tfe->mask_ & EPOLLIN) { if (tfe->mask_ & EPOLLIN) {
inConn = conns_[tfe->fd_]; inConn = conns_[tfe->fd_];
getpeername(tfe->fd_, (sockaddr*) &peer, &pLen); // log_info("come if readable %d", (inConn == NULL));
inet_ntop(AF_INET, &peer.sin_addr, buff, sizeof(buff));
log_info("buff %s port %d", buff, peer.sin_port);
log_info("come if readable %d", (inConn == NULL));
if (inConn == NULL) { if (inConn == NULL) {
continue; continue;
} }
if (inConn->PikaGetRequest() == 0) { int ret = inConn->PikaGetRequest();
if (ret == 1) {
pikaEpoll_->PikaModEvent(tfe->fd_, 0, EPOLLOUT); pikaEpoll_->PikaModEvent(tfe->fd_, 0, EPOLLOUT);
} else if (ret == 0) {
pikaEpoll_->PikaModEvent(tfe->fd_, 0, EPOLLIN);
} else { } else {
delete(inConn); delete(inConn);
shouldClose = 1; shouldClose = 1;
} }
} }
log_info("tfe mask %d %d %d", tfe->mask_, EPOLLIN, EPOLLOUT); // log_info("tfe mask %d %d %d", tfe->mask_, EPOLLIN, EPOLLOUT);
if (tfe->mask_ & EPOLLOUT) { if (tfe->mask_ & EPOLLOUT) {
log_info("Come in the EPOLLOUT branch"); // log_info("Come in the EPOLLOUT branch");
inConn = conns_[tfe->fd_]; inConn = conns_[tfe->fd_];
if (inConn == NULL) { if (inConn == NULL) {
continue; continue;
} }
if (inConn->PikaSendReply() == 0) { if (inConn->PikaSendReply() == 0) {
log_info("SendReply ok"); // log_info("SendReply ok");
pikaEpoll_->PikaModEvent(tfe->fd_, 0, EPOLLIN); pikaEpoll_->PikaModEvent(tfe->fd_, 0, EPOLLIN);
} }
} }
if ((tfe->mask_ & EPOLLERR) || (tfe->mask_ & EPOLLHUP)) { if ((tfe->mask_ & EPOLLERR) || (tfe->mask_ & EPOLLHUP)) {
log_info("close tfe fd here"); // log_info("close tfe fd here");
close(tfe->fd_); close(tfe->fd_);
} }
if (shouldClose) { if (shouldClose) {
log_info("close tfe fd here"); // log_info("close tfe fd here");
close(tfe->fd_); close(tfe->fd_);
} }
} }
} }
} }
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include "pika_define.h"
int Setnonblocking(int sockfd) int Setnonblocking(int sockfd)
...@@ -20,30 +21,3 @@ int Setnonblocking(int sockfd) ...@@ -20,30 +21,3 @@ int Setnonblocking(int sockfd)
return flags; return flags;
} }
int SetBlockType(int fd, int& flags, BlockType type)
{
if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
close(fd);
return -1;
}
if (type == kBlock) {
flags &= (~O_NONBLOCK);
} else if (type == kNonBlock) {
flags |= O_NONBLOCK;
}
if (fcntl(fd, F_SETFL, flags) < 0) {
close(fd);
return -1;
}
return 0;
}
int SafeClose(int fd)
{
if (fd >= 0) {
close(fd);
fd = -1;
return 0;
}
return -1;
}
文件模式从 100755 更改为 100644
此差异已折叠。
...@@ -3,73 +3,70 @@ ...@@ -3,73 +3,70 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdio.h> #include <stdio.h>
#include <stdint.h>
#include "status.h" #include "status.h"
const char* Status::CopyState(const char* state) { const char* Status::CopyState(const char* state) {
uint32_t size; uint32_t size;
memcpy(&size, state, sizeof(size)); memcpy(&size, state, sizeof(size));
char* result = new char[size + 5]; char* result = new char[size + 5];
memcpy(result, state, size + 5); memcpy(result, state, size + 5);
return result; return result;
} }
Status::Status(Code code, const Slice& msg, const Slice& msg2) { Status::Status(Code code, const Slice& msg, const Slice& msg2) {
assert(code != kOk); assert(code != kOk);
const uint32_t len1 = msg.size(); const uint32_t len1 = msg.size();
const uint32_t len2 = msg2.size(); const uint32_t len2 = msg2.size();
const uint32_t size = len1 + (len2 ? (2 + len2) : 0); const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
char* result = new char[size + 5]; char* result = new char[size + 5];
memcpy(result, &size, sizeof(size)); memcpy(result, &size, sizeof(size));
result[4] = static_cast<char>(code); result[4] = static_cast<char>(code);
memcpy(result + 5, msg.data(), len1); memcpy(result + 5, msg.data(), len1);
if (len2) { if (len2) {
result[5 + len1] = ':'; result[5 + len1] = ':';
result[6 + len1] = ' '; result[6 + len1] = ' ';
memcpy(result + 7 + len1, msg2.data(), len2); memcpy(result + 7 + len1, msg2.data(), len2);
} }
state_ = result; state_ = result;
} }
std::string Status::ToString() const { std::string Status::ToString() const {
if (state_ == NULL) { if (state_ == NULL) {
return "OK"; return "OK";
} else { } else {
char tmp[30]; char tmp[30];
const char* type; const char* type;
switch (code()) { switch (code()) {
case kOk: case kOk:
type = "OK"; type = "OK";
break; break;
case kNotFound: case kNotFound:
type = "NotFound: "; type = "NotFound: ";
break; break;
case kCorruption: case kCorruption:
type = "Corruption: "; type = "Corruption: ";
break; break;
case kNotSupported: case kNotSupported:
type = "Not implemented: "; type = "Not implemented: ";
break; break;
case kInvalidArgument: case kInvalidArgument:
type = "Invalid argument: "; type = "Invalid argument: ";
break; break;
case kIOError: case kIOError:
type = "IO error: "; type = "IO error: ";
break; break;
case kEndFile: default:
type = "End file: "; snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
break; static_cast<int>(code()));
default: type = tmp;
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", break;
static_cast<int>(code()));
type = tmp;
break;
}
std::string result(type);
uint32_t length;
memcpy(&length, state_, sizeof(length));
result.append(state_ + 5, length);
return result;
} }
std::string result(type);
uint32_t length;
memcpy(&length, state_, sizeof(length));
result.append(state_ + 5, length);
return result;
}
} }
此差异已折叠。
此差异已折叠。
Subproject commit 0b0b022be1c9c9139955af578fe477529d4b7b3c
此差异已折叠。
libglog.so.0.0.0
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册