...
 
Commits (6)
    https://gitcode.net/oceanbase/oblogmsg/-/commit/762aa44c836e0886ed821ccaf15ea102dd579c73 install oblogmsgConfigVersion.cmake 2021-12-13T10:58:45+08:00 萧壹 xiaoyi.yl@antgroup.com https://gitcode.net/oceanbase/oblogmsg/-/commit/9428651178fad82556de8c0374bc19120ea0d45c formated README.md 2021-12-13T10:58:45+08:00 忠阳 fanzhongyang.fzy@antgroup.com https://gitcode.net/oceanbase/oblogmsg/-/commit/79905a7f7abb879b2a435f0d3dbe0db3def1d8c9 format README.md,add blank space 2021-12-13T10:58:45+08:00 忠阳 fanzhongyang.fzy@antgroup.com https://gitcode.net/oceanbase/oblogmsg/-/commit/482cef30140e3a0daa9c583ea017d9ee1854aebe add crc32 verification 2021-12-13T10:58:45+08:00 忠阳 fanzhongyang.fzy@antgroup.com https://gitcode.net/oceanbase/oblogmsg/-/commit/b290b904222c3f2801f468e6d3cabb4ba3b2a78f hidden symbols in LogMsgBuf.cpp,set HIDDEN_DMB_SYMBOLS = true to enable 2021-12-13T10:58:45+08:00 忠阳 fanzhongyang.fzy@antgroup.com it https://gitcode.net/oceanbase/oblogmsg/-/commit/509d86f9ea7afa6a5e6a577e95d20633f6c584b7 change __thread lmb to LogMsgBuf class,set it in toString 2021-12-13T10:58:45+08:00 忠阳 fanzhongyang.fzy@antgroup.com
cmake_minimum_required(VERSION 3.20.0)
project(oblogmsg VERSION 3.2.1)
include(GNUInstallDirs)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE RelWithDebInfo CACHE STRING "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel ..." FORCE)
endif()
......@@ -35,6 +33,8 @@ option(INSTALL_OBLOGMSG "Whether or not install oblogmsg" ${OBLOGMSG_MASTER_PROJ
if(INSTALL_OBLOGMSG)
################# INSTALL ##########################################################################
include(GNUInstallDirs)
install(DIRECTORY include/
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}
)
......@@ -48,10 +48,20 @@ if(INSTALL_OBLOGMSG)
install(EXPORT oblogmsg
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/oblogmsg
FILE oblogmsgConfig.cmake
FILE oblogmsg.cmake
NAMESPACE oceanbase::
)
## install oblogmsgConfig.cmake and oblogmsgConfigVersion.cmake
file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/oblogmsgConfig.cmake "include(\${CMAKE_CURRENT_LIST_DIR}/oblogmsg.cmake)\n")
include(CMakePackageConfigHelpers)
write_basic_package_version_file(${CMAKE_CURRENT_BINARY_DIR}/oblogmsgConfigVersion.cmake
COMPATIBILITY SameMajorVersion
)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/oblogmsgConfig.cmake ${CMAKE_CURRENT_BINARY_DIR}/oblogmsgConfigVersion.cmake
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/oblogmsg
)
################# PACKAGE ##########################################################################
set(CPACK_GENERATOR "RPM")
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
......
此差异已折叠。
......@@ -16,10 +16,10 @@
#endif
#include "itoa.h"
#include "StrArray.h"
using namespace std;
using namespace oceanbase::logmessage;
LogMsgBuf* lmb = NULL;
const char* tableMeta = "[MYTEST,litter]\n"
"id,MYSQL_TYPE_LONG,8,P,\n"
"num1,MYSQL_TYPE_SHORT,4,,\n"
......@@ -618,7 +618,7 @@ LogRecordImpl* createRecordByMeta(ITableMeta* m, int type, bool string_or_point,
#ifdef LMB
LogRecordImpl* r = new LogRecordImpl(true, true);
#else
LogRecordImpl* r = new LogRecordImpl(true, true);
LogRecordImpl* r = new LogRecordImpl(true, false);
#endif
char *buf = NULL, *pos = NULL;
int size = 0, csize = 0;
......@@ -776,7 +776,7 @@ int dm_speed_test(int type, bool string_or_point)
clock_gettime(CLOCK_REALTIME, &s);
for (int i = 0; i < sc; i++) {
#ifdef LMB
lrl[i]->toString(&size);
lrl[i]->toString(&size,lmb);
#else
lrl[i]->toString();
#endif
......@@ -818,13 +818,13 @@ int dm_speed_test(int type, bool string_or_point)
int main(int argc, char* argv[])
{
#ifdef LMB
LogMsgInit();
lmb = new LogMsgBuf();
#endif
dm_speed_test(EINSERT, false);
dm_speed_test(EUPDATE, false);
dm_speed_test(EDELETE, false);
#ifdef LMB
LogMsgDestroy();
delete lmb;
#endif
return 0;
}
......@@ -19,24 +19,35 @@ namespace oceanbase {
namespace logmessage {
struct BinLogBuf;
int LogMsgInit();
int LogMsgLocalInit();
void LogMsgDestroy();
void LogMsgLocalDestroy();
const char* LogMsgGetValueByOffset(size_t offset);
size_t LogMsgAppendString(const char* string, size_t size);
size_t LogMsgAppendString(const std::string& string);
size_t LogMsgAppendBuf(const char* data, size_t size);
size_t LogMsgAppendBuf(const std::string& string);
size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size);
size_t LogMsgAppendStringArray(std::vector<std::string*>& sa);
size_t LogMsgAppendStringArray(const char** sa, size_t size);
void LogMsgSetHead(size_t size);
void LogMsgCopyHead(const char* head, size_t size);
void LogMsgFroceSetHeadSize(size_t size);
const char* LogMsgGetString(size_t* size);
size_t LogMsgAppendDataArray(std::vector<long>& sa);
size_t LogMsgAppendDataArray(uint8_t* sa, size_t size);
class LogMsgBuf {
public:
LogMsgBuf();
~LogMsgBuf();
const char* getValueByOffset(size_t offset);
size_t appendString(const char* string, size_t size);
size_t appendString(const std::string& string);
size_t appendBuf(const char* data, size_t size);
size_t appendBuf(const std::string& string);
size_t appendBuf(const BinLogBuf* sa, size_t size);
size_t appendStringArray(std::vector<std::string*>& sa);
size_t appendStringArray(const char** sa, size_t size);
void setHead(size_t size);
void copyHead(const char* head, size_t size);
void froceSetHeadSize(size_t size);
const char* getString(size_t* size);
size_t appendDataArray(std::vector<long>& sa);
size_t appendDataArray(uint8_t* sa, size_t size);
private:
inline void checkBuf(size_t size, char*& pos, uint32_t*& s, char*& head);
private:
char* buf;
size_t bufSize;
size_t bufPos;
char* defaultBuf;
size_t avg_size;
};
} // namespace logmessage
} // namespace oceanbase
......@@ -15,13 +15,15 @@ See the Mulan PSL v2 for more details. */
#include <cstdint>
#include "UserData.h"
#include "BinLogBuf.h"
#include "LogMsgBuf.h"
namespace oceanbase {
namespace logmessage {
class StrArray;
struct LogRecInfo;
extern bool LOGREC_CRC;
extern bool LOGREC_PARSE_CRC;
#define BR_FAKE_DDL_COLNAME "ddl"
typedef enum RecordType {
EINSERT = 0,
......@@ -233,7 +235,7 @@ public:
* serialize
* @return serialized ptr
*/
virtual const char* toString(size_t* size, bool reserveMemory = false) = 0;
virtual const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false) = 0;
/**
* @return serialized ptr
......@@ -271,7 +273,7 @@ class LogRecordImpl : public ILogRecord {
public:
LogRecordImpl(time_t timestamp, ITableMeta* tblMeta);
LogRecordImpl(const void* ptr, size_t size);
LogRecordImpl(bool creating = true, bool useDMB = false);
LogRecordImpl(bool creating = true, bool useLMB = false);
virtual ~LogRecordImpl();
public:
......@@ -402,7 +404,7 @@ public:
virtual size_t getRealSize();
virtual const char* toString(size_t* size, bool reserveMemory = false);
virtual const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false);
virtual const char* getFormatedString(size_t* size);
......
......@@ -4,6 +4,7 @@ set(SRC_LIST LogMsgBuf.cpp
MetaInfo.cpp
MsgType.cpp
MsgVarArea.cpp
Crc32.cpp
)
add_library(oblogmsg_shared SHARED ${SRC_LIST})
......@@ -11,6 +12,8 @@ add_library(oblogmsg_static STATIC ${SRC_LIST})
foreach(T IN ITEMS oblogmsg_shared oblogmsg_static)
target_compile_features(${T} PUBLIC cxx_std_11)
target_compile_options(${T} PRIVATE -Wno-invalid-offsetof)
set_target_properties(${T} PROPERTIES OUTPUT_NAME oblogmsg
CXX_STANDARD 11
CXX_EXTENSIONS OFF
......@@ -18,7 +21,6 @@ foreach(T IN ITEMS oblogmsg_shared oblogmsg_static)
target_include_directories(${T} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
endforeach()
# ensure that the build results can be run on systems with lower libstdc++ version than the build system
target_link_libraries(oblogmsg_shared PRIVATE -static-libstdc++)
......
此差异已折叠。
// //////////////////////////////////////////////////////////
// Crc32.cpp
// Copyright (c) 2011-2015 Stephan Brumme. All rights reserved.
// Slicing-by-16 contributed by Bulat Ziganshin
// see http://create.stephan-brumme.com/disclaimer.html
//
// g++ -o Crc32 Crc32.cpp -O3 -lrt -march=native -mtune=native
// if running on an embedded system, you might consider shrinking the
// big Crc32Lookup table:
// - crc32_bitwise doesn't need it at all
// - crc32_halfbyte has its own small lookup table
// - crc32_1byte needs only Crc32Lookup[0]
// - crc32_4bytes needs only Crc32Lookup[0..3]
// - crc32_8bytes needs only Crc32Lookup[0..7]
// - crc32_4x8bytes needs only Crc32Lookup[0..7]
// - crc32_16bytes needs all of Crc32Lookup
uint32_t crc32_halfbyte(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_1byte(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_4bytes(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_8bytes(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_4x8bytes(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_16bytes(const void* data, size_t length, uint32_t previousCrc32 = 0);
uint32_t crc32_16bytes_prefetch(
const void* data, size_t length, uint32_t previousCrc32 = 0, size_t prefetchAhead = 256);
#define crc32_fast crc32_16bytes
......@@ -19,154 +19,107 @@ See the Mulan PSL v2 for more details. */
namespace oceanbase {
namespace logmessage {
typedef struct _LogMsgBuf {
char* buf;
size_t bufSize;
size_t bufPos;
char* defaultBuf;
size_t avg_size;
} LogMsgBuf;
#define LogMsgHeadSize (sizeof(MsgHeader) + sizeof(COUNT_TYPE))
#define DefaultLogMsgBufSize 1024 * 1024 * 32
#define LOG_MSG_MAX_TAIL_SIZE 1024
static LogMsgBuf lmb_global = {NULL, 0, 0, NULL, 0};
static __thread LogMsgBuf* lmb = &lmb_global;
static inline void lm_check_buf(LogMsgBuf* lmbuf, size_t size, char*& pos, STRLEN_TYPE*& s, char*& head)
{
if (lmbuf->bufSize < size + (pos - lmbuf->buf) + LOG_MSG_MAX_TAIL_SIZE) {
char* tmp = new char[lmbuf->bufSize = (pos - lmbuf->buf) + 2 * (size) + LOG_MSG_MAX_TAIL_SIZE];
memcpy(tmp, lmbuf->buf, pos - lmbuf->buf);
pos = tmp + (pos - lmbuf->buf);
s = (STRLEN_TYPE*)(tmp + ((char*)s - lmbuf->buf));
head = tmp + ((char*)(head)-lmbuf->buf);
if (lmbuf->buf != lmbuf->defaultBuf)
delete[] lmbuf->buf;
lmbuf->buf = tmp;
}
}
int LogMsgLocalInit()
inline void LogMsgBuf::checkBuf(size_t size, char*& pos, STRLEN_TYPE*& s, char*& head)
{
LogMsgLocalDestroy();
if (NULL == (lmb = new LogMsgBuf))
return -1;
if (NULL == (lmb->buf = new char[lmb->bufSize = DefaultLogMsgBufSize])) {
delete lmb;
lmb = &lmb_global;
return -1;
if (bufSize < size + (pos - buf) + LOG_MSG_MAX_TAIL_SIZE) {
char* tmp = new char[bufSize = (pos - buf) + 2 * (size) + LOG_MSG_MAX_TAIL_SIZE];
memcpy(tmp, buf, pos - buf);
pos = tmp + (pos - buf);
s = (STRLEN_TYPE*)(tmp + ((char*)s - buf));
head = tmp + ((char*)(head)-buf);
if (buf != defaultBuf)
delete[] buf;
buf = tmp;
}
lmb->defaultBuf = lmb->buf;
lmb->bufPos = 0;
return 0;
}
void LogMsgLocalDestroy()
LogMsgBuf::LogMsgBuf()
{
if (lmb == &lmb_global)
return;
if (lmb != NULL) {
if (lmb->buf != NULL)
delete[] lmb->buf;
if (lmb->defaultBuf != lmb->buf && lmb->defaultBuf != NULL)
delete[] lmb->defaultBuf;
delete lmb;
lmb = &lmb_global;
}
}
int LogMsgInit()
{
if (lmb != &lmb_global) {
LogMsgLocalDestroy();
lmb = &lmb_global;
}
LogMsgDestroy();
if (NULL == (lmb_global.buf = new char[lmb_global.bufSize = DefaultLogMsgBufSize]))
return -1;
lmb_global.defaultBuf = lmb_global.buf;
lmb_global.bufPos = 0;
return 0;
buf = new char[bufSize = DefaultLogMsgBufSize];
defaultBuf = buf;
bufPos = 0;
}
void LogMsgDestroy()
LogMsgBuf::~LogMsgBuf()
{
if (lmb != &lmb_global)
return;
if (lmb->buf != NULL)
delete[] lmb->buf;
if (lmb->defaultBuf != lmb->buf && lmb->defaultBuf != NULL)
delete[] lmb->defaultBuf;
lmb->buf = lmb->defaultBuf = NULL;
if (buf != NULL)
delete[] buf;
if (defaultBuf != buf && defaultBuf !=NULL)
delete[] defaultBuf;
buf = NULL;
defaultBuf = NULL;
bufPos = 0;
}
const char* LogMsgGetValueByOffset(size_t offset)
const char* LogMsgBuf::getValueByOffset(size_t offset)
{
return lmb->buf + offset + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
return buf + offset + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
}
size_t LogMsgAppendString(const char* string, size_t size)
size_t LogMsgBuf::appendString(const char* string, size_t size)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING);
size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING);
if (string == NULL) {
(*(DT_TYPE*)(lmb->buf + lmb->bufPos)) |= DC_NULL;
return (lmb->bufPos += sizeof(DT_TYPE));
(*(DT_TYPE*)(buf + bufPos)) |= DC_NULL;
return (bufPos += sizeof(DT_TYPE));
}
*(STRLEN_TYPE*)(lmb->buf + lmb->bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(lmb->buf + lmb->bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
memcpy(lmb->buf + lmb->bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string, size);
lmb->bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size) + 1;
lmb->buf[lmb->bufPos - 1] = 0;
*(STRLEN_TYPE*)(buf + bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(buf + bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
memcpy(buf + bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string, size);
bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size) + 1;
buf[bufPos - 1] = 0;
return offset;
}
size_t LogMsgAppendString(const std::string& string)
size_t LogMsgBuf::appendString(const std::string& string)
{
size_t offset = lmb->bufPos - LogMsgHeadSize, size = string.size();
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING);
size_t offset = bufPos - LogMsgHeadSize, size = string.size();
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING);
if (size == 0) {
(*(DT_TYPE*)(lmb->buf + lmb->bufPos)) |= DC_NULL;
return (lmb->bufPos += sizeof(DT_TYPE));
(*(DT_TYPE*)(buf + bufPos)) |= DC_NULL;
return (bufPos += sizeof(DT_TYPE));
}
*(STRLEN_TYPE*)(lmb->buf + lmb->bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(lmb->buf + lmb->bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
strncpy(lmb->buf + lmb->bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string.c_str(), size + 1);
lmb->bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size + 1);
*(STRLEN_TYPE*)(buf + bufPos + sizeof(DT_TYPE)) = size + 1;
toLeEndian(buf + bufPos + sizeof(DT_TYPE), sizeof(STRLEN_TYPE));
strncpy(buf + bufPos + sizeof(DT_TYPE) + sizeof(STRLEN_TYPE), string.c_str(), size + 1);
bufPos += (sizeof(DT_TYPE) + sizeof(STRLEN_TYPE) + size + 1);
return offset;
}
size_t LogMsgAppendBuf(const char* data, size_t size)
size_t LogMsgBuf::appendBuf(const char* data, size_t size)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
memcpy(lmb->buf + lmb->bufPos, data, size);
lmb->bufPos += size;
size_t offset = bufPos - LogMsgHeadSize;
memcpy(buf + bufPos, data, size);
bufPos += size;
return offset;
}
size_t LogMsgAppendBuf(const std::string& string)
size_t LogMsgBuf::appendBuf(const std::string& string)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
memcpy(lmb->buf + lmb->bufPos, string.c_str(), string.size());
lmb->bufPos += string.size();
size_t offset = bufPos - LogMsgHeadSize;
memcpy(buf + bufPos, string.c_str(), string.size());
bufPos += string.size();
return offset;
}
size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size)
size_t LogMsgBuf::appendBuf(const BinLogBuf* sa, size_t size)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE);
size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
bufPos += sizeof(DT_TYPE);
if (sa == NULL || size == 0) {
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(0);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
lmb->bufPos += sizeof(COUNT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(0);
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
bufPos += sizeof(COUNT_TYPE);
return offset;
}
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
lmb->bufPos += sizeof(COUNT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
bufPos += sizeof(COUNT_TYPE);
COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head;
char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos;
......@@ -174,7 +127,7 @@ size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size)
s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i].buf != NULL) {
lm_check_buf(lmb, sa[i].buf_used_size, pos, s, head);
checkBuf(sa[i].buf_used_size, pos, s, head);
memcpy(pos, sa[i].buf, sa[i].buf_used_size);
pos[sa[i].buf_used_size] = 0;
pos += (sa[i].buf_used_size + 1);
......@@ -182,24 +135,24 @@ size_t LogMsgAppendBuf(const BinLogBuf* sa, size_t size)
}
s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf;
bufPos = pos - buf;
return offset;
}
size_t LogMsgAppendStringArray(std::vector<std::string*>& sa)
size_t LogMsgBuf::appendStringArray(std::vector<std::string*>& sa)
{
size_t offset = lmb->bufPos - LogMsgHeadSize, size = sa.size();
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
size_t offset = bufPos - LogMsgHeadSize, size = sa.size();
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(sa.size()) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
return offset;
}
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
STRLEN_TYPE len;
COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head;
char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos;
......@@ -208,7 +161,7 @@ size_t LogMsgAppendStringArray(std::vector<std::string*>& sa)
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i] != NULL) {
len = sa[i]->size();
lm_check_buf(lmb, len, pos, s, head);
checkBuf(len, pos, s, head);
memcpy(pos, sa[i]->c_str(), len);
pos[len] = 0;
pos += (len + 1);
......@@ -216,24 +169,24 @@ size_t LogMsgAppendStringArray(std::vector<std::string*>& sa)
}
s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf;
bufPos = pos - buf;
return offset;
}
size_t LogMsgAppendStringArray(const char** sa, size_t size)
size_t LogMsgBuf::appendStringArray(const char** sa, size_t size)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_STRING | DC_ARRAY);
bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(size) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
return offset;
}
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
STRLEN_TYPE len;
COUNT_TYPE i = 0;
char *pos = lmb->buf + lmb->bufPos, *head;
char *pos = buf + bufPos, *head;
STRLEN_TYPE* s = (STRLEN_TYPE*)pos;
pos += sizeof(STRLEN_TYPE) * (size + 1);
head = pos;
......@@ -242,104 +195,104 @@ size_t LogMsgAppendStringArray(const char** sa, size_t size)
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
if (sa[i] != NULL) {
len = strlen(sa[i]) + 1;
lm_check_buf(lmb, len, pos, s, head);
checkBuf(len, pos, s, head);
memcpy(pos, sa[i], len);
pos += len;
}
}
s[i] = pos - head;
toLeEndian(&(s[i]), sizeof(STRLEN_TYPE));
lmb->bufPos = pos - lmb->buf;
bufPos = pos - buf;
return offset;
}
size_t LogMsgAppendDataArray(std::vector<long>& sa)
size_t LogMsgBuf::appendDataArray(std::vector<long>& sa)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_INT64 | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_INT64 | DC_ARRAY);
bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(sa.size());
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(sa.size()) == 0) {
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
return offset;
}
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
COUNT_TYPE i = 0, j = sa.size();
char* pos = lmb->buf + lmb->bufPos;
char* pos = buf + bufPos;
pos += sizeof(STRLEN_TYPE) * i;
for (i = 0; i < j; ++i) {
*(long*)pos = sa[i];
toLeEndian(pos, sizeof(long));
pos += sizeof(long);
}
lmb->bufPos = pos - lmb->buf;
bufPos = pos - buf;
return offset;
}
size_t LogMsgAppendDataArray(uint8_t* sa, size_t size)
size_t LogMsgBuf::appendDataArray(uint8_t* sa, size_t size)
{
size_t offset = lmb->bufPos - LogMsgHeadSize;
*(DT_TYPE*)(lmb->buf + lmb->bufPos) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
lmb->bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(lmb->buf + lmb->bufPos) = (COUNT_TYPE)(size);
toLeEndian(lmb->buf + lmb->bufPos, sizeof(COUNT_TYPE));
size_t offset = bufPos - LogMsgHeadSize;
*(DT_TYPE*)(buf + bufPos) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
bufPos += sizeof(DT_TYPE);
*(COUNT_TYPE*)(buf + bufPos) = (COUNT_TYPE)(size);
toLeEndian(buf + bufPos, sizeof(COUNT_TYPE));
if ((COUNT_TYPE)(size) == 0 || sa == NULL) {
lmb->bufPos += sizeof(COUNT_TYPE);
bufPos += sizeof(COUNT_TYPE);
return offset;
}
lmb->bufPos += sizeof(COUNT_TYPE);
memcpy(lmb->buf + lmb->bufPos, sa, size);
lmb->bufPos += size;
bufPos += sizeof(COUNT_TYPE);
memcpy(buf + bufPos, sa, size);
bufPos += size;
return offset;
}
void LogMsgSetHead(size_t size)
void LogMsgBuf::setHead(size_t size)
{
lmb->bufPos = size + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
if (lmb->buf != lmb->defaultBuf) {
if (lmb->avg_size > DefaultLogMsgBufSize) {
if (lmb->avg_size < ((lmb->bufSize + DefaultLogMsgBufSize) >> 1)) {
delete[] lmb->buf;
lmb->buf = new char[lmb->bufSize = (lmb->bufSize + DefaultLogMsgBufSize) >> 1];
bufPos = size + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE);
if (buf != defaultBuf) {
if (avg_size > DefaultLogMsgBufSize) {
if (avg_size < ((bufSize + DefaultLogMsgBufSize) >> 1)) {
delete[] buf;
buf = new char[bufSize = (bufSize + DefaultLogMsgBufSize) >> 1];
}
} else {
delete[] lmb->buf;
lmb->buf = lmb->defaultBuf;
lmb->bufSize = DefaultLogMsgBufSize;
delete[] buf;
buf = defaultBuf;
bufSize = DefaultLogMsgBufSize;
}
}
}
void LogMsgCopyHead(const char* head, size_t size)
void LogMsgBuf::copyHead(const char* head, size_t size)
{
((struct MsgHeader*)lmb->buf)->m_msgType = MT_VAR;
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_msgType), sizeof(uint16_t));
((struct MsgHeader*)lmb->buf)->m_version = 1;
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_version), sizeof(uint16_t));
((struct MsgHeader*)lmb->buf)->m_size = lmb->bufPos - (sizeof(struct MsgHeader));
toLeEndian(&(((struct MsgHeader*)lmb->buf)->m_size), sizeof(uint32_t));
((struct MsgHeader*)buf)->m_msgType = MT_VAR;
toLeEndian(&(((struct MsgHeader*)buf)->m_msgType), sizeof(uint16_t));
((struct MsgHeader*)buf)->m_version = 1;
toLeEndian(&(((struct MsgHeader*)buf)->m_version), sizeof(uint16_t));
((struct MsgHeader*)buf)->m_size = bufPos - (sizeof(struct MsgHeader));
toLeEndian(&(((struct MsgHeader*)buf)->m_size), sizeof(uint32_t));
*(COUNT_TYPE*)(lmb->buf + sizeof(struct MsgHeader)) = 0;
toLeEndian(lmb->buf + sizeof(struct MsgHeader), sizeof(COUNT_TYPE));
*(lmb->buf + LogMsgHeadSize) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
toLeEndian(lmb->buf + LogMsgHeadSize, sizeof(DT_TYPE));
*(COUNT_TYPE*)(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
memcpy(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE), head, size);
*(COUNT_TYPE*)(buf + sizeof(struct MsgHeader)) = 0;
toLeEndian(buf + sizeof(struct MsgHeader), sizeof(COUNT_TYPE));
*(buf + LogMsgHeadSize) = (DT_TYPE)(DT_UINT8 | DC_ARRAY);
toLeEndian(buf + LogMsgHeadSize, sizeof(DT_TYPE));
*(COUNT_TYPE*)(buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
memcpy(buf + LogMsgHeadSize + sizeof(DT_TYPE) + sizeof(COUNT_TYPE), head, size);
}
void LogMsgFroceSetHeadSize(size_t size)
void LogMsgBuf::froceSetHeadSize(size_t size)
{
*(COUNT_TYPE*)(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(lmb->buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
*(COUNT_TYPE*)(buf + LogMsgHeadSize + sizeof(DT_TYPE)) = size;
toLeEndian(buf + LogMsgHeadSize + sizeof(DT_TYPE), sizeof(COUNT_TYPE));
}
const char* LogMsgGetString(size_t* size)
const char* LogMsgBuf::getString(size_t* size)
{
*size = lmb->bufPos;
if (lmb->buf != lmb->defaultBuf && lmb->bufPos > DefaultLogMsgBufSize)
lmb->avg_size = ((((long)lmb->avg_size >> 1) + (long)lmb->bufPos) >> 1) + (lmb->bufPos >> 2);
*size = bufPos;
if (buf != defaultBuf && bufPos > DefaultLogMsgBufSize)
avg_size = ((((long)avg_size >> 1) + (long)bufPos) >> 1) + (bufPos >> 2);
else
lmb->avg_size = (((long)lmb->avg_size + (long)lmb->bufPos) >> 1);
return lmb->buf;
avg_size = (((long)avg_size + (long)bufPos) >> 1);
return buf;
}
} // namespace logmessage
......
......@@ -20,6 +20,7 @@ See the Mulan PSL v2 for more details. */
#include <cinttypes>
#include <iostream>
#include "LogMsgBuf.h"
#include "Crc32.h"
namespace oceanbase {
namespace logmessage {
......@@ -32,6 +33,8 @@ const uint16_t LOGREC_VERSION = 3; // version num
const uint64_t LOGREC_SUB_VERSION = 0x0200000000000000; // sub version num
#define GET_LOGREC_SUB_VERSION(v) ((v)&0xFF00000000000000)
const uint64_t LOGREC_INVALID_ID = 0;
bool LOGREC_CRC = false;
bool LOGREC_PARSE_CRC = false;
// LogRecord head info
struct PosOfLogMsg_vb {
......@@ -137,7 +140,7 @@ struct LogRecInfo {
string m_instance;
string m_encoding;
string m_ob_trace_info;
bool useDMB;
bool useLMB;
bool m_reservedMemory;
LogRecInfo(time_t timestamp, ITableMeta* tblMeta)
: m_creatingMode(true),
......@@ -149,7 +152,7 @@ struct LogRecInfo {
m_tblMeta(tblMeta),
m_dbMeta(NULL),
m_expiredMetaDataCollections(NULL),
useDMB(false),
useLMB(false),
m_reservedMemory(false)
{
m_posInfo = new PosOfLogMsg_vc;
......@@ -184,7 +187,7 @@ struct LogRecInfo {
m_tblMeta(NULL),
m_dbMeta(NULL),
m_expiredMetaDataCollections(NULL),
useDMB(false),
useLMB(false),
m_reservedMemory(false)
{
m_lrDataArea = new MsgVarArea(false);
......@@ -198,7 +201,7 @@ struct LogRecInfo {
m_filter_max_count = FILTER_SIZE;
}
LogRecInfo(bool creating, bool useDMB = false)
LogRecInfo(bool creating, bool useLMB = false)
: m_creatingMode(creating),
m_parsedOK(false),
m_tailParseOK(false),
......@@ -221,9 +224,9 @@ struct LogRecInfo {
m_posInfo->m_id = LOGREC_INVALID_ID;
m_posInfo->m_srcType = SRC_MYSQL;
m_posInfo->m_srcCategory = SRC_FULL_RECORDED;
this->useDMB = useDMB;
this->useLMB = useLMB;
} else {
this->useDMB = false;
this->useLMB= false;
m_lrDataArea = new MsgVarArea(false);
}
m_reservedMemory = false;
......@@ -256,7 +259,7 @@ struct LogRecInfo {
void clear()
{
if (m_creatingMode) {
if (!useDMB) {
if (!useLMB) {
m_lrDataArea->clear();
m_lrDataArea->appendArray((uint8_t*)m_posInfo, sizeof(PosOfLogMsg_vc));
} else {
......@@ -511,6 +514,10 @@ struct LogRecInfo {
exchangeEndInfoToLe(m_endInfo, count);
exchangePosInfoToLe((const char*)m_posInfo, count);
if (m_tailParseOK && LOGREC_PARSE_CRC && count >= sizeof(struct EndOfLogMsg_v2) && m_endInfo->m_crc != 0) {
if (m_endInfo->m_crc != crc32_fast((char*)ptr, (const char*)v - (const char*)ptr + offsetof(EndOfLogMsg, m_crc)))
return -5;
}
m_parsedOK = true;
return 0;
}
......@@ -838,7 +845,7 @@ struct LogRecInfo {
void setInstance(const char* instance)
{
if (useDMB) {
if (useLMB) {
SET_OR_CLEAR_STRING(m_instance, instance);
} else
((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = m_lrDataArea->appendString(instance);
......@@ -846,7 +853,7 @@ struct LogRecInfo {
const char* instance()
{
if (useDMB)
if (useLMB)
return m_instance.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance;
return m_lrDataArea->getString(offset);
......@@ -854,7 +861,7 @@ struct LogRecInfo {
void setDbname(const char* dbname)
{
if (useDMB) {
if (useLMB) {
SET_OR_CLEAR_STRING(m_dbName, dbname);
} else {
// when m_creatingMode is false,before call function parse,m_posInfo maybe null
......@@ -866,7 +873,7 @@ struct LogRecInfo {
const char* dbname() const
{
if (useDMB)
if (useLMB)
return m_dbName.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName;
return m_lrDataArea->getString(offset);
......@@ -874,7 +881,7 @@ struct LogRecInfo {
void setTbname(const char* tbname)
{
if (useDMB) {
if (useLMB) {
SET_OR_CLEAR_STRING(m_tbName, tbname);
} else {
if (m_posInfo != NULL) {
......@@ -885,7 +892,7 @@ struct LogRecInfo {
const char* tbname() const
{
if (useDMB)
if (useLMB)
return m_tbName.c_str();
size_t offset = ((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName;
return m_lrDataArea->getString(offset);
......@@ -971,7 +978,7 @@ struct LogRecInfo {
void setRecordEncoding(const char* encoding)
{
if (useDMB)
if (useLMB)
SET_OR_CLEAR_STRING(m_encoding, encoding);
else
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = m_lrDataArea->appendString(encoding);
......@@ -994,7 +1001,7 @@ struct LogRecInfo {
void setObTraceInfo(const char* ob_trace_info)
{
if (useDMB) {
if (useLMB) {
SET_OR_CLEAR_STRING(m_ob_trace_info, ob_trace_info);
} else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfObTraceInfo = m_lrDataArea->appendString(ob_trace_info);
......@@ -1004,7 +1011,7 @@ struct LogRecInfo {
const char* obTraceInfo()
{
if (m_creatingMode || m_parsedOK) {
if (useDMB) {
if (useLMB) {
return m_ob_trace_info.c_str();
} else {
if (GET_LOGREC_SUB_VERSION(m_posInfo->m_id) >= LOGREC_SUB_VERSION) {
......@@ -1244,16 +1251,17 @@ struct LogRecInfo {
*
* @return the pointer to the serialized string, NULL is returned if failed
*/
const char* toString(size_t* size, bool reserveMemory = false)
const char* toString(size_t* size, LogMsgBuf* lmb = NULL, bool reserveMemory = false)
{
if (!m_creatingMode) {
return getSerializedString(size);
}
/* Always use the latest version to do the serialization */
int colCount = 0;
uint8_t op = m_posInfo->m_op;
if (useDMB)
LogMsgSetHead(sizeof(PosOfLogMsg_vc));
if (useLMB)
lmb->setHead(sizeof(PosOfLogMsg_vc));
switch (op) {
case EINSERT:
case EDELETE:
......@@ -1275,19 +1283,19 @@ struct LogRecInfo {
*/
m_tblMeta->trySerializeMetaDataAsMsgArea(m_extra_infos);
if (useDMB) {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendBuf(m_tblMeta->getNameData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColEncoding = LogMsgAppendBuf(m_tblMeta->getEncodingData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkKeys = LogMsgAppendBuf(m_tblMeta->getPkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfUkKeys = LogMsgAppendBuf(m_tblMeta->getUkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkVal = LogMsgAppendBuf(m_tblMeta->getkeyData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendBuf(m_tblMeta->getcolTypeData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColFlag = LogMsgAppendBuf(m_tblMeta->getColumnFlagData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNotNull = LogMsgAppendBuf(m_tblMeta->getNotNullData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColSigned = LogMsgAppendBuf(m_tblMeta->getSignedData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColDecimals = LogMsgAppendBuf(m_tblMeta->getDecimalsData());
if (useLMB) {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendBuf(m_tblMeta->getNameData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColEncoding = lmb->appendBuf(m_tblMeta->getEncodingData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkKeys = lmb->appendBuf(m_tblMeta->getPkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfUkKeys = lmb->appendBuf(m_tblMeta->getUkData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfPkVal = lmb->appendBuf(m_tblMeta->getkeyData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendBuf(m_tblMeta->getcolTypeData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColFlag = lmb->appendBuf(m_tblMeta->getColumnFlagData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNotNull = lmb->appendBuf(m_tblMeta->getNotNullData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColSigned = lmb->appendBuf(m_tblMeta->getSignedData());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColDecimals = lmb->appendBuf(m_tblMeta->getDecimalsData());
if (!m_timemarks.empty())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTimemark = LogMsgAppendDataArray(m_timemarks);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTimemark = lmb->appendDataArray(m_timemarks);
} else {
setColNames(m_tblMeta->getNameData());
setColEncoding(m_tblMeta->getEncodingData());
......@@ -1309,29 +1317,29 @@ struct LogRecInfo {
break;
}
case EDDL:
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useDMB && !m_encoding.empty()))
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useLMB && !m_encoding.empty()))
setRecordEncoding("US-ASCII");
colCount = m_new_count > 0 ? m_new_count : m_new_cols.size();
if (!useDMB) {
if (!useLMB) {
setColNames(ddlName, colCount);
setColTypes(ddlType, colCount);
/* Not know the actual encoding, dangerous */
} else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendStringArray(ddlName, colCount);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendDataArray(ddlType, colCount);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendStringArray(ddlName, colCount);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendDataArray(ddlType, colCount);
}
break;
case EDML:
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useDMB && !m_encoding.empty()))
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1 && !(useLMB && !m_encoding.empty()))
setRecordEncoding("US-ASCII");
if (!useDMB) {
if (!useLMB) {
setColNames(&dmlName, 1);
setColTypes(dmlType, 1);
/* Not know the actual encoding, dangerous */
} else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = LogMsgAppendStringArray(&dmlName, 1);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = LogMsgAppendDataArray(dmlType, 1);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColNames = lmb->appendStringArray(&dmlName, 1);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfColTypes = lmb->appendDataArray(dmlType, 1);
}
break;
default:
......@@ -1350,7 +1358,7 @@ struct LogRecInfo {
* For insert type, no old values, in a similar way, delete-type
* record has no new values. Updating record has both values.
*/
if (!useDMB) {
if (!useLMB) {
if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) {
setColValuesBeforeImage();
setColValuesAfterImage();
......@@ -1385,52 +1393,58 @@ struct LogRecInfo {
/*force set header length to PosOfLogMsg_v3,because client will check it*/
*((uint32_t*)v - 1) = toLeEndianByType((uint32_t)sizeof(PosOfLogMsg_v3));
delete[] posInfoToLe;
if (LOGREC_CRC) {
EndOfLogMsg* tail = (EndOfLogMsg*)(m.c_str() + m.size() - sizeof(EndOfLogMsg));
tail->m_crc = toLeEndianByType((uint32_t)crc32_fast(m.c_str(), (char*)tail - m.c_str() + offsetof(EndOfLogMsg, m_crc)));
}
m_parsedOK = true; // to support fetching
*size = m.size();
return m.c_str();
} else {
if ((int)((PosOfLogMsg_vc*)m_posInfo)->m_encoding == -1) {
if (m_encoding.empty())
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = LogMsgAppendString(m_tblMeta->getEncoding());
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = lmb->appendString(m_tblMeta->getEncoding());
else
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = LogMsgAppendString(m_encoding);
((PosOfLogMsg_vc*)m_posInfo)->m_encoding = lmb->appendString(m_encoding);
}
if (m_old_count == 0 && m_new_count == 0 && (m_new_cols.size() > 0 || m_old_cols.size() > 0)) {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = LogMsgAppendStringArray(m_old_cols);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = LogMsgAppendStringArray(m_new_cols);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = lmb->appendStringArray(m_old_cols);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = lmb->appendStringArray(m_new_cols);
} else {
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = LogMsgAppendBuf(m_old_clum, m_old_count);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = LogMsgAppendBuf(m_new_clum, m_new_count);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfOldCols = lmb->appendBuf(m_old_clum, m_old_count);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfNewCols = lmb->appendBuf(m_new_clum, m_new_count);
}
if (m_dbName.size())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName = LogMsgAppendString(m_dbName.c_str(), m_dbName.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfDbName = lmb->appendString(m_dbName.c_str(), m_dbName.size());
if (m_tbName.size())
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName = LogMsgAppendString(m_tbName.c_str(), m_tbName.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = LogMsgAppendString(m_instance.c_str(), m_instance.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfFilterRuleVal = LogMsgAppendBuf(m_filter_value, m_filter_count);
((PosOfLogMsg_vc*)m_posInfo)->m_posOfTbName = lmb->appendString(m_tbName.c_str(), m_tbName.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfInstance = lmb->appendString(m_instance.c_str(), m_instance.size());
((PosOfLogMsg_vc*)m_posInfo)->m_posOfFilterRuleVal = lmb->appendBuf(m_filter_value, m_filter_count);
if (m_endInfo != NULL) {
unsigned char* endInfoToLe = new unsigned char[sizeof(EndOfLogMsg)];
memcpy(endInfoToLe, (unsigned char*)m_endInfo, sizeof(EndOfLogMsg));
exchangeEndInfoToLe(endInfoToLe, sizeof(EndOfLogMsg));
((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo =
LogMsgAppendDataArray((unsigned char*)endInfoToLe, sizeof(EndOfLogMsg));
lmb->appendDataArray((unsigned char*)endInfoToLe, sizeof(EndOfLogMsg));
delete[] endInfoToLe;
}
char* posInfoToLe = new char[sizeof(PosOfLogMsg_vc)];
memcpy(posInfoToLe, (const char*)m_posInfo, sizeof(PosOfLogMsg_vc));
exchangePosInfoToLe(posInfoToLe, sizeof(PosOfLogMsg_vc));
LogMsgCopyHead(posInfoToLe, sizeof(PosOfLogMsg_vc));
LogMsgFroceSetHeadSize(sizeof(PosOfLogMsg_v3));
lmb->copyHead(posInfoToLe, sizeof(PosOfLogMsg_vc));
lmb->froceSetHeadSize(sizeof(PosOfLogMsg_v3));
delete[] posInfoToLe;
const char* msg = LogMsgGetString(size);
const char* msg = lmb->getString(size);
if (LOGREC_CRC) {
EndOfLogMsg* tail = (EndOfLogMsg*)(lmb->getValueByOffset(((PosOfLogMsg_vc*)m_posInfo)->m_posOfEndInfo));
tail->m_crc = toLeEndianByType((uint32_t)crc32_fast(msg, (char*)tail - msg + offsetof(EndOfLogMsg, m_crc)));
}
/* Serialize the header */
if (reserveMemory) {
m_reservedMemory = true;
/* Only when useDMB is true, the memory of dmb should be copied */
/* Only when useLMB is true, the memory of dmb should be copied */
if (m_lrDataArea->copy(msg, *size) != 0) {
std::cout << "LOGMESSAGE: toString return 3" << std::endl;
return NULL;
......@@ -1465,9 +1479,9 @@ LogRecordImpl::LogRecordImpl(const void* ptr, size_t size)
m_userData = NULL;
}
LogRecordImpl::LogRecordImpl(bool creating, bool useDMB)
LogRecordImpl::LogRecordImpl(bool creating, bool useLMB)
{
m_lr = new LogRecInfo(creating, useDMB);
m_lr = new LogRecInfo(creating, useLMB);
m_timemarked = false;
m_userData = NULL;
}
......@@ -1639,9 +1653,9 @@ const char* LogRecordImpl::getSerializedString(size_t* size)
return m_lr->getSerializedString(size);
}
const char* LogRecordImpl::toString(size_t* size, bool reserveMemory)
const char* LogRecordImpl::toString(size_t* size, LogMsgBuf* lmb, bool reserveMemory)
{
return m_lr->toString(size, reserveMemory);
return m_lr->toString(size, lmb, reserveMemory);
}
const char* LogRecordImpl::getFormatedString(size_t* size)
......
......@@ -19,7 +19,7 @@ TEST(LogMsgFactory, LogMsgFactoryAPI)
IMetaDataCollections* meta = LogMsgFactory::createMetaDataCollections();
ASSERT_NE((void*)NULL, (void*)meta);
ILogRecord* record = LogMsgFactory::createLogRecord();
ILogRecord* record = LogMsgFactory::createLogRecord("LogRecordImpl",true);
ASSERT_NE((void*)NULL, (void*)record);
LogMsgFactory::destroy(colMeta);
......
......@@ -110,14 +110,14 @@ typedef struct _TestThreadInfo {
void* create(void* argv)
{
LogMsgLocalInit();
LogMsgBuf* lmb = new LogMsgBuf();
TestThreadInfo* info = (TestThreadInfo*)argv;
long matched = 0;
long mismatched = 0;
while (*(info->quit) == false) {
ILogRecord* sample = createLogRecord();
size_t pre_toString_size;
sample->toString(&pre_toString_size, true);
sample->toString(&pre_toString_size, lmb,true);
size_t sample_msg_size;
const char* sample_msg_content = sample->getFormatedString(&sample_msg_size);
......@@ -137,7 +137,7 @@ void* create(void* argv)
LogMsgFactory::destroy(sample);
}
std::cout << "matched " << matched << ", mismatched " << mismatched << std::endl;
LogMsgLocalDestroy();
delete lmb;
return NULL;
}
......@@ -147,10 +147,10 @@ TEST(LogRecordImpl, ConcurrencyToString)
{
bool quit = false;
/* Create one sample for copmare */
LogMsgInit();
LogMsgBuf* lmb = new LogMsgBuf();
ILogRecord* sample = createLogRecord();
size_t sample_msg_size;
const char* sample_msg_content = sample->toString(&sample_msg_size);
const char* sample_msg_content = sample->toString(&sample_msg_size,lmb);
/* ToString in multi-pthreads */
......@@ -172,7 +172,7 @@ TEST(LogRecordImpl, ConcurrencyToString)
for (int i = 0; i < Concurrency; i++)
pthread_join(*(testThreads + i), NULL);
delete[] testThreads;
LogMsgDestroy();
delete lmb;
}
/*
TEST(LogRecordImpl, ParseAndGet) {
......