提交 8e3b1d96 编写于 作者: 忠阳

change __thread lmb to LogMsgBuf class,set it in toString

上级 0f25623c
...@@ -7,9 +7,6 @@ endif() ...@@ -7,9 +7,6 @@ endif()
add_compile_definitions($<$<COMPILE_LANGUAGE:CXX>:_GLIBCXX_USE_CXX11_ABI=0>) add_compile_definitions($<$<COMPILE_LANGUAGE:CXX>:_GLIBCXX_USE_CXX11_ABI=0>)
option(HIDDEN_DMB_SYMBOLS "hidden dmb symbols" FALSE)
mark_as_advanced(HIDDEN_DMB_SYMBOLS)
add_subdirectory(src) add_subdirectory(src)
# Check if oblogmsg is being used directly or via add_subdirectory # Check if oblogmsg is being used directly or via add_subdirectory
......
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
#endif #endif
#include "itoa.h" #include "itoa.h"
#include "StrArray.h" #include "StrArray.h"
using namespace std; using namespace std;
using namespace oceanbase::logmessage; using namespace oceanbase::logmessage;
LogMsgBuf* lmb = NULL;
const char* tableMeta = "[MYTEST,litter]\n" const char* tableMeta = "[MYTEST,litter]\n"
"id,MYSQL_TYPE_LONG,8,P,\n" "id,MYSQL_TYPE_LONG,8,P,\n"
"num1,MYSQL_TYPE_SHORT,4,,\n" "num1,MYSQL_TYPE_SHORT,4,,\n"
...@@ -618,7 +618,7 @@ LogRecordImpl* createRecordByMeta(ITableMeta* m, int type, bool string_or_point, ...@@ -618,7 +618,7 @@ LogRecordImpl* createRecordByMeta(ITableMeta* m, int type, bool string_or_point,
#ifdef LMB #ifdef LMB
LogRecordImpl* r = new LogRecordImpl(true, true); LogRecordImpl* r = new LogRecordImpl(true, true);
#else #else
LogRecordImpl* r = new LogRecordImpl(true, true); LogRecordImpl* r = new LogRecordImpl(true, false);
#endif #endif
char *buf = NULL, *pos = NULL; char *buf = NULL, *pos = NULL;
int size = 0, csize = 0; int size = 0, csize = 0;
...@@ -776,7 +776,7 @@ int dm_speed_test(int type, bool string_or_point) ...@@ -776,7 +776,7 @@ int dm_speed_test(int type, bool string_or_point)
clock_gettime(CLOCK_REALTIME, &s); clock_gettime(CLOCK_REALTIME, &s);
for (int i = 0; i < sc; i++) { for (int i = 0; i < sc; i++) {
#ifdef LMB #ifdef LMB
lrl[i]->toString(&size); lrl[i]->toString(&size,lmb);
#else #else
lrl[i]->toString(); lrl[i]->toString();
#endif #endif
...@@ -818,13 +818,13 @@ int dm_speed_test(int type, bool string_or_point) ...@@ -818,13 +818,13 @@ int dm_speed_test(int type, bool string_or_point)
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
#ifdef LMB #ifdef LMB
LogMsgInit(); lmb = new LogMsgBuf();
#endif #endif
dm_speed_test(EINSERT, false); dm_speed_test(EINSERT, false);
dm_speed_test(EUPDATE, false); dm_speed_test(EUPDATE, false);
dm_speed_test(EDELETE, false); dm_speed_test(EDELETE, false);
#ifdef LMB #ifdef LMB
LogMsgDestroy(); delete lmb;
#endif #endif
return 0; return 0;
} }
...@@ -19,24 +19,35 @@ namespace oceanbase { ...@@ -19,24 +19,35 @@ namespace oceanbase {
namespace logmessage { namespace logmessage {
struct BinLogBuf; struct BinLogBuf;
int LogMsgInit(); class LogMsgBuf {
int LogMsgLocalInit(); public:
void LogMsgDestroy(); LogMsgBuf();
void LogMsgLocalDestroy(); ~LogMsgBuf();
const char* LogMsgGetValueByOffset(size_t offset); const char* getValueByOffset(size_t offset);
size_t LogMsgAppendString(const char* string, size_t size); size_t appendString(const char* string, size_t size);
size_t LogMsgAppendString(const std::string& string); size_t appendString(const std::string& string);
size_t LogMsgAppendBuf(const char* data, size_t size); size_t appendBuf(const char* data, size_t size);
size_t LogMsgAppendBuf(const std::string& string); size_t appendBuf(const std::string& string);
size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size); size_t appendBuf(const BinLogBuf* sa, size_t size);
size_t LogMsgAppendStringArray(std::vector<std::string*>& sa); size_t appendStringArray(std::vector<std::string*>& sa);
size_t LogMsgAppendStringArray(const char** sa, size_t size); size_t appendStringArray(const char** sa, size_t size);
void LogMsgSetHead(size_t size); void setHead(size_t size);
void LogMsgCopyHead(const char* head, size_t size); void copyHead(const char* head, size_t size);
void LogMsgFroceSetHeadSize(size_t size); void froceSetHeadSize(size_t size);
const char* LogMsgGetString(size_t* size); const char* getString(size_t* size);
size_t LogMsgAppendDataArray(std::vector<long>& sa); size_t appendDataArray(std::vector<long>& sa);
size_t LogMsgAppendDataArray(uint8_t* sa, size_t size); size_t appendDataArray(uint8_t* sa, size_t size);
private:
inline void checkBuf(size_t size, char*& pos, uint32_t*& s, char*& head);
private:
char* buf;
size_t bufSize;
size_t bufPos;
char* defaultBuf;
size_t avg_size;
};
} // namespace logmessage } // namespace logmessage
} // namespace oceanbase } // namespace oceanbase
...@@ -15,6 +15,7 @@ See the Mulan PSL v2 for more details. */ ...@@ -15,6 +15,7 @@ See the Mulan PSL v2 for more details. */
#include <cstdint> #include <cstdint>
#include "UserData.h" #include "UserData.h"
#include "BinLogBuf.h" #include "BinLogBuf.h"
#include "LogMsgBuf.h"
namespace oceanbase { namespace oceanbase {
namespace logmessage { namespace logmessage {
...@@ -234,7 +235,7 @@ public: ...@@ -234,7 +235,7 @@ public:
* serialize * serialize
* @return serialized ptr * @return serialized ptr
*/ */
virtual const char* toString(size_t* size, bool reserveMemory = false) = 0; virtual const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false) = 0;
/** /**
* @return serialized ptr * @return serialized ptr
...@@ -272,7 +273,7 @@ class LogRecordImpl : public ILogRecord { ...@@ -272,7 +273,7 @@ class LogRecordImpl : public ILogRecord {
public: public:
LogRecordImpl(time_t timestamp, ITableMeta* tblMeta); LogRecordImpl(time_t timestamp, ITableMeta* tblMeta);
LogRecordImpl(const void* ptr, size_t size); LogRecordImpl(const void* ptr, size_t size);
LogRecordImpl(bool creating = true, bool useDMB = false); LogRecordImpl(bool creating = true, bool useLMB = false);
virtual ~LogRecordImpl(); virtual ~LogRecordImpl();
public: public:
...@@ -403,7 +404,7 @@ public: ...@@ -403,7 +404,7 @@ public:
virtual size_t getRealSize(); virtual size_t getRealSize();
virtual const char* toString(size_t* size, bool reserveMemory = false); virtual const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false);
virtual const char* getFormatedString(size_t* size); virtual const char* getFormatedString(size_t* size);
......
...@@ -21,7 +21,6 @@ foreach(T IN ITEMS oblogmsg_shared oblogmsg_static) ...@@ -21,7 +21,6 @@ foreach(T IN ITEMS oblogmsg_shared oblogmsg_static)
target_include_directories(${T} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} target_include_directories(${T} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>) PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
endforeach() endforeach()
target_compile_definitions(oblogmsg_static PRIVATE $<$<BOOL:${HIDDEN_DMB_SYMBOLS}>:_HIDDEN_DMB_SYMBOLS>)
# ensure that the build results can be run on systems with lower libstdc++ version than the build system # ensure that the build results can be run on systems with lower libstdc++ version than the build system
target_link_libraries(oblogmsg_shared PRIVATE -static-libstdc++) target_link_libraries(oblogmsg_shared PRIVATE -static-libstdc++)
......
...@@ -19,161 +19,107 @@ See the Mulan PSL v2 for more details. */ ...@@ -19,161 +19,107 @@ See the Mulan PSL v2 for more details. */
namespace oceanbase { namespace oceanbase {
namespace logmessage { namespace logmessage {
#ifdef _HIDDEN_DMB_SYMBOLS
#define VISIBILITY_HIDDEN __attribute__((visibility("hidden")))
#else
#define VISIBILITY_HIDDEN
#endif
typedef struct _LogMsgBuf {
char* buf;
size_t bufSize;
size_t bufPos;
char* defaultBuf;
size_t avg_size;
} LogMsgBuf;
#define LogMsgHeadSize (sizeof(MsgHeader) + sizeof(COUNT_TYPE)) #define LogMsgHeadSize (sizeof(MsgHeader) + sizeof(COUNT_TYPE))
#define DefaultLogMsgBufSize 1024 * 1024 * 32 #define DefaultLogMsgBufSize 1024 * 1024 * 32
#define LOG_MSG_MAX_TAIL_SIZE 1024 #define LOG_MSG_MAX_TAIL_SIZE 1024
static LogMsgBuf lmb_global = {NULL, 0, 0, NULL, 0}; inline void LogMsgBuf::checkBuf(size_t size, char*& pos, STRLEN_TYPE*& s, char*& head)
static __thread LogMsgBuf* lmb = &lmb_global;
static inline void lm_check_buf(LogMsgBuf* lmbuf, size_t size, char*& pos, STRLEN_TYPE*& s, char*& head)
{
if (lmbuf->bufSize < size + (pos - lmbuf->buf) + LOG_MSG_MAX_TAIL_SIZE) {
char* tmp = new char[lmbuf->bufSize = (pos - lmbuf->buf) + 2 * (size) + LOG_MSG_MAX_TAIL_SIZE];
memcpy(tmp, lmbuf->buf, pos - lmbuf->buf);
pos = tmp + (pos - lmbuf->buf);
s = (STRLEN_TYPE*)(tmp + ((char*)s - lmbuf->buf));
head = tmp + ((char*)(head)-lmbuf->buf);
if (lmbuf->buf != lmbuf->defaultBuf)
delete[] lmbuf->buf;
lmbuf->buf = tmp;
}
}
VISIBILITY_HIDDEN int LogMsgLocalInit()
{ {
LogMsgLocalDestroy(); if (bufSize < size + (pos - buf) + LOG_MSG_MAX_TAIL_SIZE) {
if (NULL == (lmb = new LogMsgBuf)) char* tmp = new char[bufSize = (pos - buf) + 2 * (size) + LOG_MSG_MAX_TAIL_SIZE];
return -1; memcpy(tmp, buf, pos - buf);
if (NULL == (lmb->buf = new char[lmb->bufSize = DefaultLogMsgBufSize])) { pos = tmp + (pos - buf);
delete lmb; s = (STRLEN_TYPE*)(tmp + ((char*)s - buf));
lmb = &lmb_global; head = tmp + ((char*)(head)-buf);
return -1; if (buf != defaultBuf)
delete[] buf;
buf = tmp;
} }
lmb->defaultBuf = lmb->buf;
lmb->bufPos = 0;
return 0;
} }
VISIBILITY_HIDDEN void LogMsgLocalDestroy() LogMsgBuf::LogMsgBuf()
{ {
if (lmb == &lmb_global) buf = new char[bufSize = DefaultLogMsgBufSize];
return; defaultBuf = buf;
bufPos = 0;
if (lmb != NULL) {
if (lmb->buf != NULL)
delete[] lmb->buf;
if (lmb->defaultBuf != lmb->buf && lmb->defaultBuf != NULL)
delete[] lmb->defaultBuf;
delete lmb;
lmb = &lmb_global;
}
}
VISIBILITY_HIDDEN int LogMsgInit()
{
if (lmb != &lmb_global) {
LogMsgLocalDestroy();
lmb = &lmb_global;
}
LogMsgDestroy();
if (NULL == (lmb_global.buf = new char[lmb_global.bufSize = DefaultLogMsgBufSize]))
return -1;
lmb_global.defaultBuf = lmb_global.buf;
lmb_global.bufPos = 0;
return 0;
} }
VISIBILITY_HIDDEN void LogMsgDestroy() LogMsgBuf::~LogMsgBuf()
{ {
if (lmb != &lmb_global) if (buf != NULL)
return; delete[] buf;
if (lmb->buf != NULL) if (defaultBuf != buf && defaultBuf !=NULL)
delete[] lmb->buf; delete[] defaultBuf;
if (lmb->defaultBuf != lmb->buf && lmb->defaultBuf != NULL) buf = NULL;
delete[] lmb->defaultBuf; defaultBuf = NULL;
lmb->buf = lmb->defaultBuf = NULL; bufPos = 0;
} }
VISIBILITY_HIDDEN const char* LogMsgGetValueByOffset(size_t offset) const char* LogMsgBuf::getValueByOffset(size_t offset)
{ {
return lmb->buf + offset + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE); return buf + offset + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
} }
VISIBILITY_HIDDEN size_t LogMsgAppendString(const char* string, size_t size) size_t LogMsgBuf::appendString(const char* string, size_t size)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING);
if (string == NULL) { if (string == NULL) {
(*(DT_TYPE*)(lmb->buf + lmb->bufPos)) |= DC_NULL; (*(DT_TYPE*)(buf + bufPos)) |= DC_NULL;
return (lmb->bufPos += sizeof(DT_TYPE)); return (bufPos += sizeof(DT_TYPE));
} }
*(STRLEN_TYPE*)(lmb->buf + lmb->bufPos + sizeof(DT_TYPE)) = size + 1; *(STRLEN_TYPE*)(buf + bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(lmb->buf + lmb->bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE)); toLeEndian(buf + bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
memcpy(lmb->buf + lmb->bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string, size); memcpy(buf + bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string, size);
lmb->bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size) + 1; bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size) + 1;
lmb->buf[lmb->bufPos - 1] = 0; buf[bufPos - 1] = 0;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendString(const std::string& string) size_t LogMsgBuf::appendString(const std::string& string)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize, size = string.size(); size_t offset = bufPos - LogMsgHeadSize, size = string.size();
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING);
if (size == 0) { if (size == 0) {
(*(DT_TYPE*)(lmb->buf + lmb->bufPos)) |= DC_NULL; (*(DT_TYPE*)(buf + bufPos)) |= DC_NULL;
return (lmb->bufPos += sizeof(DT_TYPE)); return (bufPos += sizeof(DT_TYPE));
} }
*(STRLEN_TYPE*)(lmb->buf + lmb->bufPos + sizeof(DT_TYPE)) = size + 1; *(STRLEN_TYPE*)(buf + bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(lmb->buf + lmb->bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE)); toLeEndian(buf + bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
strncpy(lmb->buf + lmb->bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string.c_str(), size + 1); strncpy(buf + bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string.c_str(), size + 1);
lmb->bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size + 1); bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size + 1);
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const char* data, size_t size) size_t LogMsgBuf::appendBuf(const char* data, size_t size)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
memcpy(lmb->buf + lmb->bufPos, data, size); memcpy(buf + bufPos, data, size);
lmb->bufPos += size; bufPos += size;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const std::string& string) size_t LogMsgBuf::appendBuf(const std::string& string)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
memcpy(lmb->buf + lmb->bufPos, string.c_str(), string.size()); memcpy(buf + bufPos, string.c_str(), string.size());
lmb->bufPos += string.size(); bufPos += string.size();
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size) size_t LogMsgBuf::appendBuf(const BinLogBuf* sa, size_t size)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE); bufPos += sizeof(DT_TYPE);
if (sa == NULL || size == 0) { if (sa == NULL || size == 0) {
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(0); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(0);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
return offset; return offset;
} }
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
COUNT_TYPE i = 0; COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head; char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos; STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1); pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos; head = pos;
...@@ -181,7 +127,7 @@ VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size) ...@@ -181,7 +127,7 @@ VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size)
s[i] = pos - head; s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i].buf != NULL) { if (sa[i].buf != NULL) {
lm_check_buf(lmb, sa[i].buf_used_size, pos, s, head); checkBuf(sa[i].buf_used_size, pos, s, head);
memcpy(pos, sa[i].buf, sa[i].buf_used_size); memcpy(pos, sa[i].buf, sa[i].buf_used_size);
pos[sa[i].buf_used_size] = 0; pos[sa[i].buf_used_size] = 0;
pos += (sa[i].buf_used_size + 1); pos += (sa[i].buf_used_size + 1);
...@@ -189,24 +135,24 @@ VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size) ...@@ -189,24 +135,24 @@ VISIBILITY_HIDDEN size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size)
} }
s[i] = pos - head; s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf; bufPos = pos - buf;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(std::vector<std::string*>& sa) size_t LogMsgBuf::appendStringArray(std::vector<std::string*>& sa)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize, size = sa.size(); size_t offset = bufPos - LogMsgHeadSize, size = sa.size();
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE); bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(sa.size()); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(sa.size()) == 0) { if ((COUNT_TYPE)(sa.size()) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
return offset; return offset;
} }
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
STRLEN_TYPE len; STRLEN_TYPE len;
COUNT_TYPE i = 0; COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head; char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos; STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1); pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos; head = pos;
...@@ -215,7 +161,7 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(std::vector<std::string*>& sa) ...@@ -215,7 +161,7 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(std::vector<std::string*>& sa)
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i] != NULL) { if (sa[i] != NULL) {
len = sa[i]->size(); len = sa[i]->size();
lm_check_buf(lmb, len, pos, s, head); checkBuf(len, pos, s, head);
memcpy(pos, sa[i]->c_str(), len); memcpy(pos, sa[i]->c_str(), len);
pos[len] = 0; pos[len] = 0;
pos += (len + 1); pos += (len + 1);
...@@ -223,24 +169,24 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(std::vector<std::string*>& sa) ...@@ -223,24 +169,24 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(std::vector<std::string*>& sa)
} }
s[i] = pos - head; s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf; bufPos = pos - buf;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(const char** sa, size_t size) size_t LogMsgBuf::appendStringArray(const char** sa, size_t size)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE); bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(size) == 0) { if ((COUNT_TYPE)(size) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
return offset; return offset;
} }
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
STRLEN_TYPE len; STRLEN_TYPE len;
COUNT_TYPE i = 0; COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head; char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos; STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1); pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos; head = pos;
...@@ -249,104 +195,104 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(const char** sa, size_t size) ...@@ -249,104 +195,104 @@ VISIBILITY_HIDDEN size_t LogMsgAppendStringArray(const char** sa, size_t size)
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i] != NULL) { if (sa[i] != NULL) {
len = strlen(sa[i]) + 1; len = strlen(sa[i]) + 1;
lm_check_buf(lmb, len, pos, s, head); checkBuf(len, pos, s, head);
memcpy(pos, sa[i], len); memcpy(pos, sa[i], len);
pos += len; pos += len;
} }
} }
s[i] = pos - head; s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE)); toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf; bufPos = pos - buf;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendDataArray(std::vector<long>& sa) size_t LogMsgBuf::appendDataArray(std::vector<long>& sa)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_INT64 | DC_ARRAY); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_INT64 | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE); bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(sa.size()); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(sa.size()) == 0) { if ((COUNT_TYPE)(sa.size()) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
return offset; return offset;
} }
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
COUNT_TYPE i = 0, j = sa.size(); COUNT_TYPE i = 0, j = sa.size();
char* pos = lmb->buf + lmb->bufPos; char* pos = buf + bufPos;
pos += sizeof(STRLEN_TYPE) * i; pos += sizeof(STRLEN_TYPE) * i;
for (i = 0; i < j; ++i) { for (i = 0; i < j; ++i) {
*(long*)pos = sa[i]; *(long*)pos = sa[i];
toLeEndian(pos, sizeof(long)); toLeEndian(pos, sizeof(long));
pos += sizeof(long); pos += sizeof(long);
} }
lmb->bufPos = pos - lmb->buf; bufPos = pos - buf;
return offset; return offset;
} }
VISIBILITY_HIDDEN size_t LogMsgAppendDataArray(uint8_t* sa, size_t size) size_t LogMsgBuf::appendDataArray(uint8_t* sa, size_t size)
{ {
size_t offset = lmb->bufPos - LogMsgHeadSize; size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_UINT8 | DC_ARRAY); *(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE); bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size); *(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE)); toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(size) == 0 || sa == NULL) { if ((COUNT_TYPE)(size) == 0 || sa == NULL) {
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
return offset; return offset;
} }
lmb->bufPos += sizeof(COUNT_TYPE); bufPos += sizeof(COUNT_TYPE);
memcpy(lmb->buf + lmb->bufPos, sa, size); memcpy(buf + bufPos, sa, size);
lmb->bufPos += size; bufPos += size;
return offset; return offset;
} }
VISIBILITY_HIDDEN void LogMsgSetHead(size_t size) void LogMsgBuf::setHead(size_t size)
{ {
lmb->bufPos = size + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE); bufPos = size + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
if (lmb->buf != lmb->defaultBuf) { if (buf != defaultBuf) {
if (lmb->avg_size > DefaultLogMsgBufSize) { if (avg_size > DefaultLogMsgBufSize) {
if (lmb->avg_size < ((lmb->bufSize + DefaultLogMsgBufSize) >> 1)) { if (avg_size < ((bufSize + DefaultLogMsgBufSize) >> 1)) {
delete[] lmb->buf; delete[] buf;
lmb->buf = new char[lmb->bufSize = (lmb->bufSize + DefaultLogMsgBufSize) >> 1]; buf = new char[bufSize = (bufSize + DefaultLogMsgBufSize) >> 1];
} }
} else { } else {
delete[] lmb->buf; delete[] buf;
lmb->buf = lmb->defaultBuf; buf = defaultBuf;
lmb->bufSize = DefaultLogMsgBufSize; bufSize = DefaultLogMsgBufSize;
} }
} }
} }
VISIBILITY_HIDDEN void LogMsgCopyHead(const char* head, size_t size) void LogMsgBuf::copyHead(const char* head, size_t size)
{ {
((struct MsgHeader*)lmb->buf)->m_msgType = MT_VAR; ((struct MsgHeader*)buf)->m_msgType = MT_VAR;
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_msgType), sizeof(uint16_t)); toLeEndian(&(((struct MsgHeader*)buf)->m_msgType), sizeof(uint16_t));
((struct MsgHeader*)lmb->buf)->m_version = 1; ((struct MsgHeader*)buf)->m_version = 1;
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_version), sizeof(uint16_t)); toLeEndian(&(((struct MsgHeader*)buf)->m_version), sizeof(uint16_t));
((struct MsgHeader*)lmb->buf)->m_size = lmb->bufPos - (sizeof(struct MsgHeader)); ((struct MsgHeader*)buf)->m_size = bufPos - (sizeof(struct MsgHeader));
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_size), sizeof(uint32_t)); toLeEndian(&(((struct MsgHeader*)buf)->m_size), sizeof(uint32_t));
*(COUNT_TYPE*)(lmb->buf + sizeof(struct MsgHeader)) = 0; *(COUNT_TYPE*)(buf + sizeof(struct MsgHeader)) = 0;
toLeEndian(lmb->buf + sizeof(struct MsgHeader), sizeof(COUNT_TYPE)); toLeEndian(buf + sizeof(struct MsgHeader), sizeof(COUNT_TYPE));
*(lmb->buf + LogMsgHeadSize) = (DT_TYPE)(DT_UINT8 | DC_ARRAY); *(buf + LogMsgHeadSize) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
toLeEndian(lmb->buf + LogMsgHeadSize, sizeof(DT_TYPE)); toLeEndian(buf + LogMsgHeadSize, sizeof(DT_TYPE));
*(COUNT_TYPE*)(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size; *(COUNT_TYPE*)(buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE)); toLeEndian(buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
memcpy(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE), head, size); memcpy(buf + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE), head, size);
} }
VISIBILITY_HIDDEN void LogMsgFroceSetHeadSize(size_t size) void LogMsgBuf::froceSetHeadSize(size_t size)
{ {
*(COUNT_TYPE*)(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size; *(COUNT_TYPE*)(buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE)); toLeEndian(buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
} }
VISIBILITY_HIDDEN const char* LogMsgGetString(size_t* size) const char* LogMsgBuf::getString(size_t* size)
{ {
*size = lmb->bufPos; *size = bufPos;
if (lmb->buf != lmb->defaultBuf && lmb->bufPos > DefaultLogMsgBufSize) if (buf != defaultBuf && bufPos > DefaultLogMsgBufSize)
lmb->avg_size = ((((long)lmb->avg_size >> 1) + (long)lmb->bufPos) >> 1) + (lmb->bufPos >> 2); avg_size = ((((long)avg_size >> 1) + (long)bufPos) >> 1) + (bufPos >> 2);
else else
lmb->avg_size = (((long)lmb->avg_size + (long)lmb->bufPos) >> 1); avg_size = (((long)avg_size + (long)bufPos) >> 1);
return lmb->buf; return buf;
} }
} // namespace logmessage } // namespace logmessage
......
...@@ -140,7 +140,7 @@ struct LogRecInfo { ...@@ -140,7 +140,7 @@ struct LogRecInfo {
string m_instance; string m_instance;
string m_encoding; string m_encoding;
string m_ob_trace_info; string m_ob_trace_info;
bool useDMB; bool useLMB;
bool m_reservedMemory; bool m_reservedMemory;
LogRecInfo(time_t timestamp, ITableMeta* tblMeta) LogRecInfo(time_t timestamp, ITableMeta* tblMeta)
: m_creatingMode(true), : m_creatingMode(true),
...@@ -152,7 +152,7 @@ struct LogRecInfo { ...@@ -152,7 +152,7 @@ struct LogRecInfo {
m_tblMeta(tblMeta), m_tblMeta(tblMeta),
m_dbMeta(NULL), m_dbMeta(NULL),
m_expiredMetaDataCollections(NULL), m_expiredMetaDataCollections(NULL),
useDMB(false), useLMB(false),
m_reservedMemory(false) m_reservedMemory(false)
{ {
m_posInfo = new PosOfLogMsg_vc; m_posInfo = new PosOfLogMsg_vc;
...@@ -187,7 +187,7 @@ struct LogRecInfo { ...@@ -187,7 +187,7 @@ struct LogRecInfo {
m_tblMeta(NULL), m_tblMeta(NULL),
m_dbMeta(NULL), m_dbMeta(NULL),
m_expiredMetaDataCollections(NULL), m_expiredMetaDataCollections(NULL),
useDMB(false), useLMB(false),
m_reservedMemory(false) m_reservedMemory(false)
{ {
m_lrDataArea = new MsgVarArea(false); m_lrDataArea = new MsgVarArea(false);
...@@ -201,7 +201,7 @@ struct LogRecInfo { ...@@ -201,7 +201,7 @@ struct LogRecInfo {
m_filter_max_count = FILTER_SIZE; m_filter_max_count = FILTER_SIZE;
} }
LogRecInfo(bool creating, bool useDMB = false) LogRecInfo(bool creating, bool useLMB = false)
: m_creatingMode(creating), : m_creatingMode(creating),
m_parsedOK(false), m_parsedOK(false),
m_tailParseOK(false), m_tailParseOK(false),
...@@ -224,9 +224,9 @@ struct LogRecInfo { ...@@ -224,9 +224,9 @@ struct LogRecInfo {
m_posInfo->m_id = LOGREC_INVALID_ID; m_posInfo->m_id = LOGREC_INVALID_ID;
m_posInfo->m_srcType = SRC_MYSQL; m_posInfo->m_srcType = SRC_MYSQL;
m_posInfo->m_srcCategory = SRC_FULL_RECORDED; m_posInfo->m_srcCategory = SRC_FULL_RECORDED;
this->useDMB = useDMB; this->useLMB = useLMB;
} else { } else {
this->useDMB = false; this->useLMB= false;
m_lrDataArea = new MsgVarArea(false); m_lrDataArea = new MsgVarArea(false);
} }
m_reservedMemory = false; m_reservedMemory = false;
...@@ -259,7 +259,7 @@ struct LogRecInfo { ...@@ -259,7 +259,7 @@ struct LogRecInfo {
void clear() void clear()
{ {
if (m_creatingMode) { if (m_creatingMode) {
if (!useDMB) { if (!useLMB) {
m_lrDataArea->clear(); m_lrDataArea->clear();
m_lrDataArea->appendArray((uint8_t*)m_posInfo, sizeof(PosOfLogMsg_vc)); m_lrDataArea->appendArray((uint8_t*)m_posInfo, sizeof(PosOfLogMsg_vc));
} else { } else {
...@@ -845,7 +845,7 @@ struct LogRecInfo { ...@@ -845,7 +845,7 @@ struct LogRecInfo {
void setInstance(const char* instance) void setInstance(const char* instance)
{ {
if (useDMB) { if (useLMB) {
SET_OR_CLEAR_STRING(m_instance, instance); SET_OR_CLEAR_STRING(m_instance, instance);
} else } else
((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = m_lrDataArea->appendString(instance); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = m_lrDataArea->appendString(instance);
...@@ -853,7 +853,7 @@ struct LogRecInfo { ...@@ -853,7 +853,7 @@ struct LogRecInfo {
const char* instance() const char* instance()
{ {
if (useDMB) if (useLMB)
return m_instance.c_str(); return m_instance.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance; size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance;
return m_lrDataArea->getString(offset); return m_lrDataArea->getString(offset);
...@@ -861,7 +861,7 @@ struct LogRecInfo { ...@@ -861,7 +861,7 @@ struct LogRecInfo {
void setDbname(const char* dbname) void setDbname(const char* dbname)
{ {
if (useDMB) { if (useLMB) {
SET_OR_CLEAR_STRING(m_dbName, dbname); SET_OR_CLEAR_STRING(m_dbName, dbname);
} else { } else {
// when m_creatingMode is false,before call function parse,m_posInfo maybe null // when m_creatingMode is false,before call function parse,m_posInfo maybe null
...@@ -873,7 +873,7 @@ struct LogRecInfo { ...@@ -873,7 +873,7 @@ struct LogRecInfo {
const char* dbname() const const char* dbname() const
{ {
if (useDMB) if (useLMB)
return m_dbName.c_str(); return m_dbName.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName; size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName;
return m_lrDataArea->getString(offset); return m_lrDataArea->getString(offset);
...@@ -881,7 +881,7 @@ struct LogRecInfo { ...@@ -881,7 +881,7 @@ struct LogRecInfo {
void setTbname(const char* tbname) void setTbname(const char* tbname)
{ {
if (useDMB) { if (useLMB) {
SET_OR_CLEAR_STRING(m_tbName, tbname); SET_OR_CLEAR_STRING(m_tbName, tbname);
} else { } else {
if (m_posInfo != NULL) { if (m_posInfo != NULL) {
...@@ -892,7 +892,7 @@ struct LogRecInfo { ...@@ -892,7 +892,7 @@ struct LogRecInfo {
const char* tbname() const const char* tbname() const
{ {
if (useDMB) if (useLMB)
return m_tbName.c_str(); return m_tbName.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName; size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName;
return m_lrDataArea->getString(offset); return m_lrDataArea->getString(offset);
...@@ -978,7 +978,7 @@ struct LogRecInfo { ...@@ -978,7 +978,7 @@ struct LogRecInfo {
void setRecordEncoding(const char* encoding) void setRecordEncoding(const char* encoding)
{ {
if (useDMB) if (useLMB)
SET_OR_CLEAR_STRING(m_encoding, encoding); SET_OR_CLEAR_STRING(m_encoding, encoding);
else else
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = m_lrDataArea->appendString(encoding); ((PosOfLogMsg_vc*)m_posInfo)->m_encoding = m_lrDataArea->appendString(encoding);
...@@ -1001,7 +1001,7 @@ struct LogRecInfo { ...@@ -1001,7 +1001,7 @@ struct LogRecInfo {
void setObTraceInfo(const char* ob_trace_info) void setObTraceInfo(const char* ob_trace_info)
{ {
if (useDMB) { if (useLMB) {
SET_OR_CLEAR_STRING(m_ob_trace_info, ob_trace_info); SET_OR_CLEAR_STRING(m_ob_trace_info, ob_trace_info);
} else { } else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfObTraceInfo = m_lrDataArea->appendString(ob_trace_info); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfObTraceInfo = m_lrDataArea->appendString(ob_trace_info);
...@@ -1011,7 +1011,7 @@ struct LogRecInfo { ...@@ -1011,7 +1011,7 @@ struct LogRecInfo {
const char* obTraceInfo() const char* obTraceInfo()
{ {
if (m_creatingMode || m_parsedOK) { if (m_creatingMode || m_parsedOK) {
if (useDMB) { if (useLMB) {
return m_ob_trace_info.c_str(); return m_ob_trace_info.c_str();
} else { } else {
if (GET_LOGREC_SUB_VERSION(m_posInfo->m_id) >= LOGREC_SUB_VERSION) { if (GET_LOGREC_SUB_VERSION(m_posInfo->m_id) >= LOGREC_SUB_VERSION) {
...@@ -1251,16 +1251,17 @@ struct LogRecInfo { ...@@ -1251,16 +1251,17 @@ struct LogRecInfo {
* *
* @return the pointer to the serialized string, NULL is returned if failed * @return the pointer to the serialized string, NULL is returned if failed
*/ */
const char* toString(size_t* size, bool reserveMemory = false) const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false)
{ {
if (!m_creatingMode) { if (!m_creatingMode) {
return getSerializedString(size); return getSerializedString(size);
} }
/* Always use the latest version to do the serialization */ /* Always use the latest version to do the serialization */
int colCount = 0; int colCount = 0;
uint8_t op = m_posInfo->m_op; uint8_t op = m_posInfo->m_op;
if (useDMB) if (useLMB)
LogMsgSetHead(sizeof(PosOfLogMsg_vc)); lmb->setHead(sizeof(PosOfLogMsg_vc));
switch (op) { switch (op) {
case EINSERT: case EINSERT:
case EDELETE: case EDELETE:
...@@ -1282,19 +1283,19 @@ struct LogRecInfo { ...@@ -1282,19 +1283,19 @@ struct LogRecInfo {
*/ */
m_tblMeta->trySerializeMetaDataAsMsgArea(m_extra_infos); m_tblMeta->trySerializeMetaDataAsMsgArea(m_extra_infos);
if (useDMB) { if (useLMB) {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendBuf(m_tblMeta->getNameData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendBuf(m_tblMeta->getNameData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColEncoding = LogMsgAppendBuf(m_tblMeta->getEncodingData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColEncoding = lmb->appendBuf(m_tblMeta->getEncodingData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkKeys = LogMsgAppendBuf(m_tblMeta->getPkData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkKeys = lmb->appendBuf(m_tblMeta->getPkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfUkKeys = LogMsgAppendBuf(m_tblMeta->getUkData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfUkKeys = lmb->appendBuf(m_tblMeta->getUkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkVal = LogMsgAppendBuf(m_tblMeta->getkeyData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkVal = lmb->appendBuf(m_tblMeta->getkeyData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendBuf(m_tblMeta->getcolTypeData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendBuf(m_tblMeta->getcolTypeData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColFlag = LogMsgAppendBuf(m_tblMeta->getColumnFlagData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColFlag = lmb->appendBuf(m_tblMeta->getColumnFlagData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNotNull = LogMsgAppendBuf(m_tblMeta->getNotNullData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNotNull = lmb->appendBuf(m_tblMeta->getNotNullData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColSigned = LogMsgAppendBuf(m_tblMeta->getSignedData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColSigned = lmb->appendBuf(m_tblMeta->getSignedData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColDecimals = LogMsgAppendBuf(m_tblMeta->getDecimalsData()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColDecimals = lmb->appendBuf(m_tblMeta->getDecimalsData());
if (!m_timemarks.empty()) if (!m_timemarks.empty())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTimemark = LogMsgAppendDataArray(m_timemarks); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfTimemark = lmb->appendDataArray(m_timemarks);
} else { } else {
setColNames(m_tblMeta->getNameData()); setColNames(m_tblMeta->getNameData());
setColEncoding(m_tblMeta->getEncodingData()); setColEncoding(m_tblMeta->getEncodingData());
...@@ -1316,29 +1317,29 @@ struct LogRecInfo { ...@@ -1316,29 +1317,29 @@ struct LogRecInfo {
break; break;
} }
case EDDL: case EDDL:
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useDMB && !m_encoding.empty())) if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useLMB && !m_encoding.empty()))
setRecordEncoding("US-ASCII"); setRecordEncoding("US-ASCII");
colCount = m_new_count > 0 ? m_new_count : m_new_cols.size(); colCount = m_new_count > 0 ? m_new_count : m_new_cols.size();
if (!useDMB) { if (!useLMB) {
setColNames(ddlName, colCount); setColNames(ddlName, colCount);
setColTypes(ddlType, colCount); setColTypes(ddlType, colCount);
/* Not know the actual encoding, dangerous */ /* Not know the actual encoding, dangerous */
} else { } else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendStringArray(ddlName, colCount); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendStringArray(ddlName, colCount);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendDataArray(ddlType, colCount); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendDataArray(ddlType, colCount);
} }
break; break;
case EDML: case EDML:
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useDMB && !m_encoding.empty())) if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useLMB && !m_encoding.empty()))
setRecordEncoding("US-ASCII"); setRecordEncoding("US-ASCII");
if (!useDMB) { if (!useLMB) {
setColNames(&dmlName, 1); setColNames(&dmlName, 1);
setColTypes(dmlType, 1); setColTypes(dmlType, 1);
/* Not know the actual encoding, dangerous */ /* Not know the actual encoding, dangerous */
} else { } else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendStringArray(&dmlName, 1); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendStringArray(&dmlName, 1);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendDataArray(dmlType, 1); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendDataArray(dmlType, 1);
} }
break; break;
default: default:
...@@ -1357,7 +1358,7 @@ struct LogRecInfo { ...@@ -1357,7 +1358,7 @@ struct LogRecInfo {
* For insert type, no old values, in a similar way, delete-type * For insert type, no old values, in a similar way, delete-type
* record has no new values. Updating record has both values. * record has no new values. Updating record has both values.
*/ */
if (!useDMB) { if (!useLMB) {
if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) { if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) {
setColValuesBeforeImage(); setColValuesBeforeImage();
setColValuesAfterImage(); setColValuesAfterImage();
...@@ -1402,48 +1403,48 @@ struct LogRecInfo { ...@@ -1402,48 +1403,48 @@ struct LogRecInfo {
} else { } else {
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1) { if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1) {
if (m_encoding.empty()) if (m_encoding.empty())
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = LogMsgAppendString(m_tblMeta->getEncoding()); ((PosOfLogMsg_vc*)m_posInfo)->m_encoding = lmb->appendString(m_tblMeta->getEncoding());
else else
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = LogMsgAppendString(m_encoding); ((PosOfLogMsg_vc*)m_posInfo)->m_encoding = lmb->appendString(m_encoding);
} }
if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) { if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = LogMsgAppendStringArray(m_old_cols); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = lmb->appendStringArray(m_old_cols);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = LogMsgAppendStringArray(m_new_cols); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = lmb->appendStringArray(m_new_cols);
} else { } else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = LogMsgAppendBuf(m_old_clum, m_old_count); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = lmb->appendBuf(m_old_clum, m_old_count);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = LogMsgAppendBuf(m_new_clum, m_new_count); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = lmb->appendBuf(m_new_clum, m_new_count);
} }
if (m_dbName.size()) if (m_dbName.size())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName = LogMsgAppendString(m_dbName.c_str(), m_dbName.size()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName = lmb->appendString(m_dbName.c_str(), m_dbName.size());
if (m_tbName.size()) if (m_tbName.size())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName = LogMsgAppendString(m_tbName.c_str(), m_tbName.size()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName = lmb->appendString(m_tbName.c_str(), m_tbName.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = LogMsgAppendString(m_instance.c_str(), m_instance.size()); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = lmb->appendString(m_instance.c_str(), m_instance.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfFilterRuleVal = LogMsgAppendBuf(m_filter_value, m_filter_count); ((PosOfLogMsg_vc*)m_posInfo)->m_posOfFilterRuleVal = lmb->appendBuf(m_filter_value, m_filter_count);
if (m_endInfo != NULL) { if (m_endInfo != NULL) {
unsigned char* endInfoToLe = new unsigned char[sizeof(EndOfLogMsg)]; unsigned char* endInfoToLe = new unsigned char[sizeof(EndOfLogMsg)];
memcpy(endInfoToLe, (unsigned char*)m_endInfo, sizeof(EndOfLogMsg)); memcpy(endInfoToLe, (unsigned char*)m_endInfo, sizeof(EndOfLogMsg));
exchangeEndInfoToLe(endInfoToLe, sizeof(EndOfLogMsg)); exchangeEndInfoToLe(endInfoToLe, sizeof(EndOfLogMsg));
((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo =
LogMsgAppendDataArray((unsigned char*)endInfoToLe, sizeof(EndOfLogMsg)); lmb->appendDataArray((unsigned char*)endInfoToLe, sizeof(EndOfLogMsg));
delete[] endInfoToLe; delete[] endInfoToLe;
} }
char* posInfoToLe = new char[sizeof(PosOfLogMsg_vc)]; char* posInfoToLe = new char[sizeof(PosOfLogMsg_vc)];
memcpy(posInfoToLe, (const char*)m_posInfo, sizeof(PosOfLogMsg_vc)); memcpy(posInfoToLe, (const char*)m_posInfo, sizeof(PosOfLogMsg_vc));
exchangePosInfoToLe(posInfoToLe, sizeof(PosOfLogMsg_vc)); exchangePosInfoToLe(posInfoToLe, sizeof(PosOfLogMsg_vc));
LogMsgCopyHead(posInfoToLe, sizeof(PosOfLogMsg_vc)); lmb->copyHead(posInfoToLe, sizeof(PosOfLogMsg_vc));
LogMsgFroceSetHeadSize(sizeof(PosOfLogMsg_v3)); lmb->froceSetHeadSize(sizeof(PosOfLogMsg_v3));
delete[] posInfoToLe; delete[] posInfoToLe;
const char* msg = LogMsgGetString(size); const char* msg = lmb->getString(size);
if (LOGREC_CRC) { if (LOGREC_CRC) {
EndOfLogMsg* tail = (EndOfLogMsg*)LogMsgGetValueByOffset(((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo); EndOfLogMsg* tail = (EndOfLogMsg*)(lmb->getValueByOffset(((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo));
tail->m_crc = toLeEndianByType((uint32_t)crc32_fast(msg, (char*)tail - msg + offsetof(EndOfLogMsg, m_crc))); tail->m_crc = toLeEndianByType((uint32_t)crc32_fast(msg, (char*)tail - msg + offsetof(EndOfLogMsg, m_crc)));
} }
/* Serialize the header */ /* Serialize the header */
if (reserveMemory) { if (reserveMemory) {
m_reservedMemory = true; m_reservedMemory = true;
/* Only when useDMB is true, the memory of dmb should be copied */ /* Only when useLMB is true, the memory of dmb should be copied */
if (m_lrDataArea->copy(msg, *size) != 0) { if (m_lrDataArea->copy(msg, *size) != 0) {
std::cout << "LOGMESSAGE: toString return 3" << std::endl; std::cout << "LOGMESSAGE: toString return 3" << std::endl;
return NULL; return NULL;
...@@ -1478,9 +1479,9 @@ LogRecordImpl::LogRecordImpl(const void* ptr, size_t size) ...@@ -1478,9 +1479,9 @@ LogRecordImpl::LogRecordImpl(const void* ptr, size_t size)
m_userData = NULL; m_userData = NULL;
} }
LogRecordImpl::LogRecordImpl(bool creating, bool useDMB) LogRecordImpl::LogRecordImpl(bool creating, bool useLMB)
{ {
m_lr = new LogRecInfo(creating, useDMB); m_lr = new LogRecInfo(creating, useLMB);
m_timemarked = false; m_timemarked = false;
m_userData = NULL; m_userData = NULL;
} }
...@@ -1652,9 +1653,9 @@ const char* LogRecordImpl::getSerializedString(size_t* size) ...@@ -1652,9 +1653,9 @@ const char* LogRecordImpl::getSerializedString(size_t* size)
return m_lr->getSerializedString(size); return m_lr->getSerializedString(size);
} }
const char* LogRecordImpl::toString(size_t* size, bool reserveMemory) const char* LogRecordImpl::toString(size_t* size, LogMsgBuf* lmb, bool reserveMemory)
{ {
return m_lr->toString(size, reserveMemory); return m_lr->toString(size, lmb, reserveMemory);
} }
const char* LogRecordImpl::getFormatedString(size_t* size) const char* LogRecordImpl::getFormatedString(size_t* size)
......
...@@ -19,7 +19,7 @@ TEST(LogMsgFactory, LogMsgFactoryAPI) ...@@ -19,7 +19,7 @@ TEST(LogMsgFactory, LogMsgFactoryAPI)
IMetaDataCollections* meta = LogMsgFactory::createMetaDataCollections(); IMetaDataCollections* meta = LogMsgFactory::createMetaDataCollections();
ASSERT_NE((void*)NULL, (void*)meta); ASSERT_NE((void*)NULL, (void*)meta);
ILogRecord* record = LogMsgFactory::createLogRecord(); ILogRecord* record = LogMsgFactory::createLogRecord("LogRecordImpl",true);
ASSERT_NE((void*)NULL, (void*)record); ASSERT_NE((void*)NULL, (void*)record);
LogMsgFactory::destroy(colMeta); LogMsgFactory::destroy(colMeta);
......
...@@ -110,14 +110,14 @@ typedef struct _TestThreadInfo { ...@@ -110,14 +110,14 @@ typedef struct _TestThreadInfo {
void* create(void* argv) void* create(void* argv)
{ {
LogMsgLocalInit(); LogMsgBuf* lmb = new LogMsgBuf();
TestThreadInfo* info = (TestThreadInfo*)argv; TestThreadInfo* info = (TestThreadInfo*)argv;
long matched = 0; long matched = 0;
long mismatched = 0; long mismatched = 0;
while (*(info->quit) == false) { while (*(info->quit) == false) {
ILogRecord* sample = createLogRecord(); ILogRecord* sample = createLogRecord();
size_t pre_toString_size; size_t pre_toString_size;
sample->toString(&pre_toString_size, true); sample->toString(&pre_toString_size, lmb,true);
size_t sample_msg_size; size_t sample_msg_size;
const char* sample_msg_content = sample->getFormatedString(&sample_msg_size); const char* sample_msg_content = sample->getFormatedString(&sample_msg_size);
...@@ -137,7 +137,7 @@ void* create(void* argv) ...@@ -137,7 +137,7 @@ void* create(void* argv)
LogMsgFactory::destroy(sample); LogMsgFactory::destroy(sample);
} }
std::cout << "matched " << matched << ", mismatched " << mismatched << std::endl; std::cout << "matched " << matched << ", mismatched " << mismatched << std::endl;
LogMsgLocalDestroy(); delete lmb;
return NULL; return NULL;
} }
...@@ -147,10 +147,10 @@ TEST(LogRecordImpl, ConcurrencyToString) ...@@ -147,10 +147,10 @@ TEST(LogRecordImpl, ConcurrencyToString)
{ {
bool quit = false; bool quit = false;
/* Create one sample for copmare */ /* Create one sample for copmare */
LogMsgInit(); LogMsgBuf* lmb = new LogMsgBuf();
ILogRecord* sample = createLogRecord(); ILogRecord* sample = createLogRecord();
size_t sample_msg_size; size_t sample_msg_size;
const char* sample_msg_content = sample->toString(&sample_msg_size); const char* sample_msg_content = sample->toString(&sample_msg_size,lmb);
/* ToString in multi-pthreads */ /* ToString in multi-pthreads */
...@@ -172,7 +172,7 @@ TEST(LogRecordImpl, ConcurrencyToString) ...@@ -172,7 +172,7 @@ TEST(LogRecordImpl, ConcurrencyToString)
for (int i = 0; i < Concurrency; i++) for (int i = 0; i < Concurrency; i++)
pthread_join(*(testThreads + i), NULL); pthread_join(*(testThreads + i), NULL);
delete[] testThreads; delete[] testThreads;
LogMsgDestroy(); delete lmb;
} }
/* /*
TEST(LogRecordImpl, ParseAndGet) { TEST(LogRecordImpl, ParseAndGet) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册