提交 97b4a6af 编写于 作者: X xj0 提交者: LINGuanRen

[tableapi] support hbase api

上级 d1707e6a
......@@ -578,6 +578,7 @@ PCODE_DEF(OB_TABLE_API_LOGIN, 0x1101)
PCODE_DEF(OB_TABLE_API_EXECUTE, 0x1102)
PCODE_DEF(OB_TABLE_API_BATCH_EXECUTE, 0x1103)
PCODE_DEF(OB_TABLE_API_EXECUTE_QUERY, 0x1104)
PCODE_DEF(OB_TABLE_API_QUERY_AND_MUTATE, 0x1105)
// Event Job API
PCODE_DEF(OB_RUN_EVENT_JOB, 0x1201)
......
......@@ -255,8 +255,17 @@ ob_set_subtarget(ob_server table
table/ob_table_rpc_processor.cpp
table/ob_table_service.cpp
table/ob_table_api_row_iterator.cpp
table/ob_table_query_and_mutate_processor.cpp
table/ob_htable_filter_operator.cpp
table/ob_htable_filter_parser.cpp
table/ob_htable_utils.cpp
table/ob_htable_filters.cpp
table/htable_filter_tab.cxx
table/htable_filter_lex.cxx
)
set_source_files_properties(table/htable_filter_lex.cxx PROPERTIES COMPILE_FLAGS -Wno-null-conversion)
ob_server_add_pchs(observer
ob_server_struct.h
ob_uniq_task_queue.h
......
......@@ -41,6 +41,7 @@
#include "observer/table/ob_table_execute_processor.h"
#include "observer/table/ob_table_batch_execute_processor.h"
#include "observer/table/ob_table_query_processor.h"
#include "observer/table/ob_table_query_and_mutate_processor.h"
using namespace oceanbase;
using namespace oceanbase::observer;
......@@ -136,6 +137,7 @@ void oceanbase::observer::init_srv_xlator_for_others(ObSrvRpcXlator* xlator)
RPC_PROCESSOR(ObTableApiExecuteP, gctx_);
RPC_PROCESSOR(ObTableBatchExecuteP, gctx_);
RPC_PROCESSOR(ObTableQueryP, gctx_);
RPC_PROCESSOR(ObTableQueryAndMutateP, gctx_);
// HA GTS
RPC_PROCESSOR(ObHaGtsPingRequestP, gctx_);
......
#!/bin/bash
export PATH=/usr/local/bin:$PATH
# generate sql_parser
bison -v -Werror --defines=../../../src/observer/table/htable_filter_tab.hxx --output=../../../src/observer/table/htable_filter_tab.cxx ../../../src/observer/table/htable_filter_tab.yxx
if [ $? -ne 0 ]
then
echo yacc error[$?], abort.
exit 1
fi
flex --header-file="../../../src/observer/table/htable_filter_lex.hxx" --outfile="../../../src/observer/table/htable_filter_lex.cxx" ../../../src/observer/table/htable_filter_lex.lxx
此差异已折叠。
#ifndef ob_hfilter_HEADER_H
#define ob_hfilter_HEADER_H 1
#define ob_hfilter_IN_HEADER 1
#line 6 "../../../src/observer/table/htable_filter_lex.hxx"
#line 7 "../../../src/observer/table/htable_filter_lex.lxx"
#define USING_LOG_PREFIX SERVER
#include "observer/table/ob_htable_filter_parser.h"
#include "observer/table/ob_htable_filters.h"
#include "observer/table/htable_filter_tab.hxx"
using namespace oceanbase::common;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#line 18 "../../../src/observer/table/htable_filter_lex.hxx"
#define YY_INT_ALIGNED short int
/* A lexical scanner generated by flex */
#define FLEX_SCANNER
#define YY_FLEX_MAJOR_VERSION 2
#define YY_FLEX_MINOR_VERSION 5
#define YY_FLEX_SUBMINOR_VERSION 35
#if YY_FLEX_SUBMINOR_VERSION > 0
#define FLEX_BETA
#endif
/* First, we deal with platform-specific or compiler-specific issues. */
/* begin standard C headers. */
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
/* end standard C headers. */
/* flex integer type definitions */
#ifndef FLEXINT_H
#define FLEXINT_H
/* C99 systems have <inttypes.h>. Non-C99 systems may or may not. */
#if defined (__STDC_VERSION__) && __STDC_VERSION__ >= 199901L
/* C99 says to define __STDC_LIMIT_MACROS before including stdint.h,
* if you want the limit (max/min) macros for int types.
*/
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS 1
#endif
#include <inttypes.h>
typedef int8_t flex_int8_t;
typedef uint8_t flex_uint8_t;
typedef int16_t flex_int16_t;
typedef uint16_t flex_uint16_t;
typedef int32_t flex_int32_t;
typedef uint32_t flex_uint32_t;
#else
typedef signed char flex_int8_t;
typedef short int flex_int16_t;
typedef int flex_int32_t;
typedef unsigned char flex_uint8_t;
typedef unsigned short int flex_uint16_t;
typedef unsigned int flex_uint32_t;
#endif /* ! C99 */
/* Limits of integral types. */
#ifndef INT8_MIN
#define INT8_MIN (-128)
#endif
#ifndef INT16_MIN
#define INT16_MIN (-32767-1)
#endif
#ifndef INT32_MIN
#define INT32_MIN (-2147483647-1)
#endif
#ifndef INT8_MAX
#define INT8_MAX (127)
#endif
#ifndef INT16_MAX
#define INT16_MAX (32767)
#endif
#ifndef INT32_MAX
#define INT32_MAX (2147483647)
#endif
#ifndef UINT8_MAX
#define UINT8_MAX (255U)
#endif
#ifndef UINT16_MAX
#define UINT16_MAX (65535U)
#endif
#ifndef UINT32_MAX
#define UINT32_MAX (4294967295U)
#endif
#endif /* ! FLEXINT_H */
#ifdef __cplusplus
/* The "const" storage-class-modifier is valid. */
#define YY_USE_CONST
#else /* ! __cplusplus */
/* C99 requires __STDC__ to be defined as 1. */
#if defined (__STDC__)
#define YY_USE_CONST
#endif /* defined (__STDC__) */
#endif /* ! __cplusplus */
#ifdef YY_USE_CONST
#define yyconst const
#else
#define yyconst
#endif
/* An opaque pointer. */
#ifndef YY_TYPEDEF_YY_SCANNER_T
#define YY_TYPEDEF_YY_SCANNER_T
typedef void* yyscan_t;
#endif
/* For convenience, these vars (plus the bison vars far below)
are macros in the reentrant scanner. */
#define yyin yyg->yyin_r
#define yyout yyg->yyout_r
#define yyextra yyg->yyextra_r
#define yyleng yyg->yyleng_r
#define yytext yyg->yytext_r
#define yylineno (YY_CURRENT_BUFFER_LVALUE->yy_bs_lineno)
#define yycolumn (YY_CURRENT_BUFFER_LVALUE->yy_bs_column)
#define yy_flex_debug yyg->yy_flex_debug_r
/* Size of default input buffer. */
#ifndef YY_BUF_SIZE
#define YY_BUF_SIZE 16384
#endif
#ifndef YY_TYPEDEF_YY_BUFFER_STATE
#define YY_TYPEDEF_YY_BUFFER_STATE
typedef struct yy_buffer_state *YY_BUFFER_STATE;
#endif
#ifndef YY_TYPEDEF_YY_SIZE_T
#define YY_TYPEDEF_YY_SIZE_T
typedef size_t yy_size_t;
#endif
#ifndef YY_STRUCT_YY_BUFFER_STATE
#define YY_STRUCT_YY_BUFFER_STATE
struct yy_buffer_state
{
FILE *yy_input_file;
char *yy_ch_buf; /* input buffer */
char *yy_buf_pos; /* current position in input buffer */
/* Size of input buffer in bytes, not including room for EOB
* characters.
*/
yy_size_t yy_buf_size;
/* Number of characters read into yy_ch_buf, not including EOB
* characters.
*/
int yy_n_chars;
/* Whether we "own" the buffer - i.e., we know we created it,
* and can realloc() it to grow it, and should free() it to
* delete it.
*/
int yy_is_our_buffer;
/* Whether this is an "interactive" input source; if so, and
* if we're using stdio for input, then we want to use getc()
* instead of fread(), to make sure we stop fetching input after
* each newline.
*/
int yy_is_interactive;
/* Whether we're considered to be at the beginning of a line.
* If so, '^' rules will be active on the next match, otherwise
* not.
*/
int yy_at_bol;
int yy_bs_lineno; /**< The line count. */
int yy_bs_column; /**< The column count. */
/* Whether to try to fill the input buffer when we reach the
* end of it.
*/
int yy_fill_buffer;
int yy_buffer_status;
};
#endif /* !YY_STRUCT_YY_BUFFER_STATE */
void ob_hfilter_restart (FILE *input_file ,yyscan_t yyscanner );
void ob_hfilter__switch_to_buffer (YY_BUFFER_STATE new_buffer ,yyscan_t yyscanner );
YY_BUFFER_STATE ob_hfilter__create_buffer (FILE *file,int size ,yyscan_t yyscanner );
void ob_hfilter__delete_buffer (YY_BUFFER_STATE b ,yyscan_t yyscanner );
void ob_hfilter__flush_buffer (YY_BUFFER_STATE b ,yyscan_t yyscanner );
void ob_hfilter_push_buffer_state (YY_BUFFER_STATE new_buffer ,yyscan_t yyscanner );
void ob_hfilter_pop_buffer_state (yyscan_t yyscanner );
YY_BUFFER_STATE ob_hfilter__scan_buffer (char *base,yy_size_t size ,yyscan_t yyscanner );
YY_BUFFER_STATE ob_hfilter__scan_string (yyconst char *yy_str ,yyscan_t yyscanner );
YY_BUFFER_STATE ob_hfilter__scan_bytes (yyconst char *bytes,int len ,yyscan_t yyscanner );
void *ob_hfilter_alloc (yy_size_t ,yyscan_t yyscanner );
void *ob_hfilter_realloc (void *,yy_size_t ,yyscan_t yyscanner );
void ob_hfilter_free (void * ,yyscan_t yyscanner );
/* Begin user sect3 */
#define ob_hfilter_wrap(n) 1
#define YY_SKIP_YYWRAP
#define yytext_ptr yytext_r
#ifdef YY_HEADER_EXPORT_START_CONDITIONS
#define INITIAL 0
#define IN_STRING 1
#endif
#ifndef YY_NO_UNISTD_H
/* Special case for "unistd.h", since it is non-ANSI. We include it way
* down here because we want the user's section 1 to have been scanned first.
* The user has a chance to override it with an option.
*/
#include <unistd.h>
#endif
#define YY_EXTRA_TYPE oceanbase::table::ObHTableFilterParser *
int ob_hfilter_lex_init (yyscan_t* scanner);
int ob_hfilter_lex_init_extra (YY_EXTRA_TYPE user_defined,yyscan_t* scanner);
/* Accessor methods to globals.
These are made visible to non-reentrant scanners for convenience. */
int ob_hfilter_lex_destroy (yyscan_t yyscanner );
int ob_hfilter_get_debug (yyscan_t yyscanner );
void ob_hfilter_set_debug (int debug_flag ,yyscan_t yyscanner );
YY_EXTRA_TYPE ob_hfilter_get_extra (yyscan_t yyscanner );
void ob_hfilter_set_extra (YY_EXTRA_TYPE user_defined ,yyscan_t yyscanner );
FILE *ob_hfilter_get_in (yyscan_t yyscanner );
void ob_hfilter_set_in (FILE * in_str ,yyscan_t yyscanner );
FILE *ob_hfilter_get_out (yyscan_t yyscanner );
void ob_hfilter_set_out (FILE * out_str ,yyscan_t yyscanner );
int ob_hfilter_get_leng (yyscan_t yyscanner );
char *ob_hfilter_get_text (yyscan_t yyscanner );
int ob_hfilter_get_lineno (yyscan_t yyscanner );
void ob_hfilter_set_lineno (int line_number ,yyscan_t yyscanner );
YYSTYPE * ob_hfilter_get_lval (yyscan_t yyscanner );
void ob_hfilter_set_lval (YYSTYPE * yylval_param ,yyscan_t yyscanner );
YYLTYPE *ob_hfilter_get_lloc (yyscan_t yyscanner );
void ob_hfilter_set_lloc (YYLTYPE * yylloc_param ,yyscan_t yyscanner );
/* Macros after this point can all be overridden by user definitions in
* section 1.
*/
#ifndef YY_SKIP_YYWRAP
#ifdef __cplusplus
extern "C" int ob_hfilter_wrap (yyscan_t yyscanner );
#else
extern int ob_hfilter_wrap (yyscan_t yyscanner );
#endif
#endif
#ifndef yytext_ptr
static void yy_flex_strncpy (char *,yyconst char *,int ,yyscan_t yyscanner);
#endif
#ifdef YY_NEED_STRLEN
static int yy_flex_strlen (yyconst char * ,yyscan_t yyscanner);
#endif
#ifndef YY_NO_INPUT
#endif
/* Amount of stuff to slurp up with each read. */
#ifndef YY_READ_BUF_SIZE
#define YY_READ_BUF_SIZE 8192
#endif
/* Number of entries by which start-condition stack grows. */
#ifndef YY_START_STACK_INCR
#define YY_START_STACK_INCR 25
#endif
/* Default declaration of generated scanner - a define so the user can
* easily add parameters.
*/
#ifndef YY_DECL
#define YY_DECL_IS_OURS 1
extern int ob_hfilter_lex \
(YYSTYPE * yylval_param,YYLTYPE * yylloc_param ,yyscan_t yyscanner);
#define YY_DECL int ob_hfilter_lex \
(YYSTYPE * yylval_param, YYLTYPE * yylloc_param , yyscan_t yyscanner)
#endif /* !YY_DECL */
/* yy_get_previous_state - get the state just before the EOB char was reached */
#undef YY_NEW_FILE
#undef YY_FLUSH_BUFFER
#undef yy_set_bol
#undef yy_new_buffer
#undef yy_set_interactive
#undef YY_DO_BEFORE_ACTION
#ifdef YY_DECL_IS_OURS
#undef YY_DECL_IS_OURS
#undef YY_DECL
#endif
#line 106 "../../../src/observer/table/htable_filter_lex.lxx"
#line 353 "../../../src/observer/table/htable_filter_lex.hxx"
#undef ob_hfilter_IN_HEADER
#endif /* ob_hfilter_HEADER_H */
%option yylineno case-insensitive reentrant
%option bison-bridge bison-locations
%option noyyalloc noyyrealloc noyyfree noyywrap nounput noinput
%option extra-type="oceanbase::table::ObHTableFilterParser *"
%option prefix="ob_hfilter_"
%top{
#define USING_LOG_PREFIX SERVER
#include "observer/table/ob_htable_filter_parser.h"
#include "observer/table/ob_htable_filters.h"
#include "observer/table/htable_filter_tab.hxx"
using namespace oceanbase::common;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
}
%x IN_STRING
ID ([A-Za-z0-9$_]*)
INT [0-9]+
SQUOTE '
%%
/* rules */
SKIP { return SKIP; }
WHILE { return WHILE; }
AND { return AND; }
OR { return OR; }
"=" { return EQUAL; }
"!=" { return NOT_EQUAL; }
">=" { return GREATER_OR_EQUAL; }
">" { return GREATER; }
"<=" { return LESS_OR_EQUAL; }
"<" { return LESS; }
NO_OP { return NO_OP; }
TRUE {
yylval->ival = 1;
return BOOL_VALUE;
}
FALSE {
yylval->ival = 0;
return BOOL_VALUE;
}
{INT} {
errno = 0;
yylval->lval = strtoll(yytext, NULL, 10);
if (ERANGE == errno)
{
ob_hfilter_error(yylloc, yyextra, "integar value out of range");
}
return INT_VALUE;
}
{SQUOTE} {
BEGIN(IN_STRING);
char *buf = static_cast<char*>(yyextra->alloc(yyextra->get_input_len()+1));
if (NULL == buf) {
yyextra->error_code_ = oceanbase::common::OB_ALLOCATE_MEMORY_FAILED;
ob_hfilter_error(yylloc, yyextra, "no memory");
return ERROR;
}
yylval->sval.len_ = 0;
yylval->sval.str_ = buf;
}
<IN_STRING>[^']+ {
memcpy(yylval->sval.str_+yylval->sval.len_, yytext, yyleng);
yylval->sval.len_ += yyleng;
}
<IN_STRING>{SQUOTE}{SQUOTE} {
yylval->sval.str_[yylval->sval.len_++] = '\'';
}
<IN_STRING>{SQUOTE} {
BEGIN(INITIAL);
return STRING_VALUE;
}
<IN_STRING><<EOF>> {
ob_hfilter_error(yylloc, yyextra, "unterminated quoted string");
return ERROR;
}
RowFilter { return RowFilter; }
ValueFilter { return ValueFilter; }
QualifierFilter { return QualifierFilter; }
SingleColumnValueFilter { return SingleColumnValueFilter; }
PageFilter { return PageFilter; }
ColumnCountGetFilter { return ColumnCountGetFilter; }
CheckAndMutateFilter { return CheckAndMutateFilter; }
PrefixFilter { return PrefixFilter; }
[(),] {
return yytext[0];
}
[ \t\r\n] {/*skip*/}
<<EOF>> {
return END;
}
. {
ob_hfilter_error(yylloc, yyextra, "mystery charactor '%c'", *yytext);
return ERROR;
}
%%
/* user code */
#pragma GCC diagnostic pop
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -50,6 +50,9 @@ private:
int multi_replace();
int multi_update();
int batch_execute(bool is_readonly);
int htable_delete();
int htable_put();
int htable_mutate_row();
private:
static const int64_t COMMON_COLUMN_NUM = 16;
table::ObTableEntityFactory<table::ObTableEntity> default_entity_factory_;
......
......@@ -64,13 +64,18 @@ int ObTableApiExecuteP::deserialize()
arg_.table_operation_.set_entity(request_entity_);
result_.set_entity(result_entity_);
int ret = ParentType::deserialize();
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
// @note modify the timestamp to be negative
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(request_entity_);
}
return ret;
}
int ObTableApiExecuteP::check_arg()
{
int ret = OB_SUCCESS;
if (arg_.consistency_level_ != ObTableConsistencyLevel::STRONG) {
if (!(arg_.consistency_level_ == ObTableConsistencyLevel::STRONG ||
arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("some options not supported yet", K(ret),
"consistency_level", arg_.consistency_level_,
......@@ -207,7 +212,13 @@ int ObTableApiExecuteP::response(const int retcode)
{
int ret = OB_SUCCESS;
if (!need_retry_in_queue_ && !did_async_end_trans()) {
ret = ObRpcProcessor::response(retcode);
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
// @note modify the value of timestamp to be positive
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_);
}
if (OB_SUCC(ret)) {
ret = ObRpcProcessor::response(retcode);
}
}
return ret;
}
......@@ -255,6 +266,7 @@ int ObTableApiExecuteP::process_get()
arg_.entity_type_,
arg_.binlog_row_image_type_);
const bool is_readonly = true;
const ObTableConsistencyLevel consistency_level = arg_.consistency_level_;
ObRowkey rowkey = const_cast<ObITableEntity&>(arg_.table_operation_.entity()).get_rowkey();
ObSEArray<int64_t, 1> part_ids;
if (OB_FAIL(check_arg2())) {
......@@ -264,7 +276,7 @@ int ObTableApiExecuteP::process_get()
LOG_WARN("failed to get partition id", K(ret));
} else if (OB_FAIL(part_ids.push_back(get_ctx_.param_partition_id()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_SELECT, table_id, part_ids, get_timeout_ts()))) {
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_SELECT, consistency_level, table_id, part_ids, get_timeout_ts()))) {
LOG_WARN("failed to start readonly transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_get(get_ctx_, arg_.table_operation_, result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
......
此差异已折叠。
此差异已折叠。
......@@ -29,6 +29,7 @@ public:
RPC_S(PR5 execute, obrpc::OB_TABLE_API_EXECUTE, (table::ObTableOperationRequest), table::ObTableOperationResult);
RPC_S(PR5 batch_execute, obrpc::OB_TABLE_API_BATCH_EXECUTE, (table::ObTableBatchOperationRequest), table::ObTableBatchOperationResult);
RPC_SS(PR5 execute_query, obrpc::OB_TABLE_API_EXECUTE_QUERY, (table::ObTableQueryRequest), table::ObTableQueryResult);
RPC_S(PR5 query_and_mutate, obrpc::OB_TABLE_API_QUERY_AND_MUTATE, (table::ObTableQueryAndMutateRequest), table::ObTableQueryAndMutateResult);
};
}; // end namespace obrpc
......
......@@ -76,3 +76,12 @@ OB_SERIALIZE_MEMBER(ObTableQueryRequest,
consistency_level_,
query_
);
////////////////////////////////////////////////////////////////
OB_SERIALIZE_MEMBER(ObTableQueryAndMutateRequest,
credential_,
table_name_,
table_id_,
partition_id_,
entity_type_,
query_and_mutate_);
\ No newline at end of file
......@@ -5,3 +5,4 @@ ob_unittest(test_worker_pool omt/test_worker_pool.cpp)
ob_unittest(test_token_calcer omt/test_token_calcer.cpp)
ob_unittest(test_information_schema)
ob_unittest(test_tableapi tableapi/test_tableapi.cpp)
ob_unittest(test_hbaseapi hbaseapi/test_hfilter_parser.cpp)
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册