From 12d2ea8abe3ca4388bbd03346ea5a86c0b753c48 Mon Sep 17 00:00:00 2001 From: enhellowhy Date: Wed, 16 Oct 2019 12:18:21 +0800 Subject: [PATCH] feat: restructure ripple splitter module. --- common/jsonrpc/rpc.go | 4 +- config/splitter_example.conf | 57 +++ config/splitter_settings.go | 7 +- service/meta.go | 33 +- .../model/{xrp/block.go => ripple/ledger.go} | 13 +- service/model/ripple/meta.go | 58 +++ .../model/ripple/transaction_account_set.go | 17 + .../ripple/transaction_deposit_preauth.go | 12 + .../model/ripple/transaction_escrow_cancel.go | 12 + .../model/ripple/transaction_escrow_create.go | 16 + .../model/ripple/transaction_escrow_finish.go | 14 + .../model/ripple/transaction_offer_cancel.go | 11 + .../model/ripple/transaction_offer_create.go | 14 + service/model/ripple/transaction_payment.go | 17 + .../transaction_payment_channel_claim.go | 15 + .../transaction_payment_channel_create.go | 16 + .../transaction_payment_channel_fund.go | 13 + .../ripple/transaction_set_regular_key.go | 11 + .../ripple/transaction_signer_list_set.go | 12 + service/model/ripple/transaction_trust_set.go | 13 + service/model/xrp/account.go | 22 - service/model/xrp/amount.go | 20 - service/model/xrp/meta.go | 30 -- service/model/xrp/path.go | 18 - service/model/xrp/transaction.go | 92 ---- .../model/xrp/transaction_affected_nodes.go | 19 - splitter/ripple/handler_rpc.go | 79 +++ splitter/ripple/handler_rpc_test.go | 33 ++ splitter/ripple/job.go | 178 +++++++ splitter/ripple/meta.go | 64 +++ splitter/ripple/method.go | 262 ++++++++++ splitter/ripple/schema.json | 2 + splitter/ripple/splitter.go | 468 ++++++++++++++++++ splitter/{xrp => ripple}/worker.go | 12 +- splitter/splitter.go | 6 +- splitter/xrp/handler_rpc.go | 126 ----- splitter/xrp/job.go | 250 ---------- splitter/xrp/meta.go | 49 -- splitter/xrp/method.go | 266 ---------- splitter/xrp/splitter.go | 404 --------------- 40 files changed, 1435 insertions(+), 1330 deletions(-) rename service/model/{xrp/block.go => ripple/ledger.go} (86%) create mode 100644 service/model/ripple/meta.go create mode 100644 service/model/ripple/transaction_account_set.go create mode 100644 service/model/ripple/transaction_deposit_preauth.go create mode 100644 service/model/ripple/transaction_escrow_cancel.go create mode 100644 service/model/ripple/transaction_escrow_create.go create mode 100644 service/model/ripple/transaction_escrow_finish.go create mode 100644 service/model/ripple/transaction_offer_cancel.go create mode 100644 service/model/ripple/transaction_offer_create.go create mode 100644 service/model/ripple/transaction_payment.go create mode 100644 service/model/ripple/transaction_payment_channel_claim.go create mode 100644 service/model/ripple/transaction_payment_channel_create.go create mode 100644 service/model/ripple/transaction_payment_channel_fund.go create mode 100644 service/model/ripple/transaction_set_regular_key.go create mode 100644 service/model/ripple/transaction_signer_list_set.go create mode 100644 service/model/ripple/transaction_trust_set.go delete mode 100644 service/model/xrp/account.go delete mode 100644 service/model/xrp/amount.go delete mode 100644 service/model/xrp/meta.go delete mode 100644 service/model/xrp/path.go delete mode 100644 service/model/xrp/transaction.go delete mode 100644 service/model/xrp/transaction_affected_nodes.go create mode 100644 splitter/ripple/handler_rpc.go create mode 100644 splitter/ripple/handler_rpc_test.go create mode 100644 splitter/ripple/job.go create mode 100644 splitter/ripple/meta.go create mode 100644 splitter/ripple/method.go create mode 100644 splitter/ripple/schema.json create mode 100644 splitter/ripple/splitter.go rename splitter/{xrp => ripple}/worker.go (80%) delete mode 100644 splitter/xrp/handler_rpc.go delete mode 100644 splitter/xrp/job.go delete mode 100644 splitter/xrp/meta.go delete mode 100644 splitter/xrp/method.go delete mode 100644 splitter/xrp/splitter.go diff --git a/common/jsonrpc/rpc.go b/common/jsonrpc/rpc.go index 1f98e52..05331fd 100644 --- a/common/jsonrpc/rpc.go +++ b/common/jsonrpc/rpc.go @@ -86,8 +86,8 @@ func (c *Client) CallXRP(method string, params ...interface{}) ([]byte, error) { errorMap := json.GetBytes(response, "error").Map() if len(errorMap) != 0 { - code := json.GetBytes(response, "error.code").Int() - msg := json.GetBytes(response, "error.message").String() + code := json.GetBytes(response, "error_code").Int() + msg := json.GetBytes(response, "error_message").String() return nil, errors.New(fmt.Sprintf("error: %d %s", code, msg)) } diff --git a/config/splitter_example.conf b/config/splitter_example.conf index 7b045bf..08d1710 100644 --- a/config/splitter_example.conf +++ b/config/splitter_example.conf @@ -555,6 +555,63 @@ max_idle_conns = 100 sql_log_file = /var/log/bds-splitter/xlm-sql.log debug = false +# =============================== xrp ================================== +[xrp] +#是否开启 xrp 数据splitter +enable = true +#是否开启数据库 +database_enable = true +#数据库worker缓存大小 +database_worker_buffer = 8192 +#数据库worker数量 +database_worker_number = 20 +#一次请求区块链数据的最大块数,400000块之前可以设置大一些,比如300 +max_batch_block = 1000 +#xrp 全节点的地址 +endpoint = http://[xrp 全节点的ip/域名]:[xrp 全节点运行端口] +#xrp 数据校验规则文件地址 +json_schema_file = /etc/bds-splitter/schema/xrp.json↩ +#xrp 数据校验是否开启 +json_schema_validation_enable = false + +#xrp 定时任务配置 +[cron.xrp] +update_meta_expr = @every 1m +get_batch_ledger_expr = @every 1m + +#xrp kafka 配置 +[kafka.xrp] +enable = true +topic = xrp +# kafka 客户端标示 +client_id = xrp-client-1 +# kafka 消费组标示 +group_id = xrp-group +# kafka 服务的地址 +broker_list = [kafka 服务的ip/域名]:[kafka 服务的运行端口] +buffer_size = 1000 +return_errors = true + +#xrp 数据库配置 +[database.xrp] +#数据库类型,sql server为mssql,postgre为postgres +type = postgres +#数据库的访问地址 +host = [数据库服务的ip/域名] +#数据库的端口信息 +port = [数据库服务的端口] +#数据库的库名,需要初始化好,创建表和导入数据用 +database = [数据库服务的库] +#数据库的访问账号 +user = [数据库服务的账号] +#数据库的访问账号密码信息 +password = [数据库服务的密码] +timezone = Asia/Shanghai +max_open_conns = 500 +max_idle_conns = 100 +sql_log_file = /var/log/bds-splitter/xrp-sql.log +debug = false + # =============================== log ================================== #普通日志配置 diff --git a/config/splitter_settings.go b/config/splitter_settings.go index 98ffbe5..76a5795 100644 --- a/config/splitter_settings.go +++ b/config/splitter_settings.go @@ -231,8 +231,6 @@ type XRP struct { DatabaseEnable bool `ini:"database_enable"` DatabaseWorkerBuffer int `ini:"database_worker_buffer"` DatabaseWorkerNumber int `ini:"database_worker_number"` - SkipHeight int `ini:"skip_height"` - SkipMissBlock bool `ini:"skip_miss_block"` MaxBatchBlock int `ini:"max_batch_block"` Endpoint string `ini:"endpoint"` KafkaProxyHost string `ini:"kafka_proxy_host"` @@ -303,9 +301,8 @@ type CronTRON struct { UpdateMetaExpr string `ini:"update_meta_expr"` } type CronXRP struct { - UpdateMetaExpr string `ini:"update_meta_expr"` - GetBatchBlockExpr string `ini:"get_batch_block_expr"` - RefreshPoolNameExpr string `ini:"refresh_pool_name_expr"` + UpdateMetaExpr string `ini:"update_meta_expr"` + GetBatchLedgerExpr string `ini:"get_batch_ledger_expr"` } type CronXLM struct { diff --git a/service/meta.go b/service/meta.go index bb9647b..55422c5 100644 --- a/service/meta.go +++ b/service/meta.go @@ -14,9 +14,9 @@ import ( "github.com/jdcloud-bds/bds/service/model/etc" "github.com/jdcloud-bds/bds/service/model/eth" "github.com/jdcloud-bds/bds/service/model/ltc" + "github.com/jdcloud-bds/bds/service/model/ripple" "github.com/jdcloud-bds/bds/service/model/tron" "github.com/jdcloud-bds/bds/service/model/xlm" - "github.com/jdcloud-bds/bds/service/model/xrp" ) var ( @@ -130,14 +130,23 @@ var ( new(eos.Transaction).TableName(): new(eos.Transaction), new(eos.Action).TableName(): new(eos.Action), - // xrp - new(xrp.Meta).TableName(): new(xrp.Meta), - new(xrp.Block).TableName(): new(xrp.Block), - new(xrp.Transaction).TableName(): new(xrp.Transaction), - new(xrp.Path).TableName(): new(xrp.Path), - new(xrp.Account).TableName(): new(xrp.Account), - new(xrp.AffectedNodes).TableName(): new(xrp.AffectedNodes), - new(xrp.Amount).TableName(): new(xrp.Amount), + // ripple + new(ripple.Meta).TableName(): new(ripple.Meta), + new(ripple.Ledger).TableName(): new(ripple.Ledger), + new(ripple.AccountSet).TableName(): new(ripple.AccountSet), + new(ripple.DepositPreauth).TableName(): new(ripple.DepositPreauth), + new(ripple.EscrowCancel).TableName(): new(ripple.EscrowCancel), + new(ripple.EscrowCreate).TableName(): new(ripple.EscrowCreate), + new(ripple.EscrowFinish).TableName(): new(ripple.EscrowFinish), + new(ripple.OfferCancel).TableName(): new(ripple.OfferCancel), + new(ripple.OfferCreate).TableName(): new(ripple.OfferCreate), + new(ripple.Payment).TableName(): new(ripple.Payment), + new(ripple.PaymentChannelClaim).TableName(): new(ripple.PaymentChannelClaim), + new(ripple.PaymentChannelCreate).TableName(): new(ripple.PaymentChannelCreate), + new(ripple.PaymentChannelFund).TableName(): new(ripple.PaymentChannelFund), + new(ripple.SetRegularKey).TableName(): new(ripple.SetRegularKey), + new(ripple.SignerListSet).TableName(): new(ripple.SignerListSet), + new(ripple.TrustSet).TableName(): new(ripple.TrustSet), // doge new(doge.Meta).TableName(): new(doge.Meta), @@ -456,13 +465,13 @@ func CheckEOSTable(engine *xorm.Engine) error { } func CheckXRPTable(engine *xorm.Engine) error { - err := syncTable(engine, fmt.Sprintf("%s_meta", xrp.TablePrefix), nil) + err := syncTable(engine, fmt.Sprintf("%s_meta", ripple.TablePrefix), nil) if err != nil { return err } fn := func(tableName string) error { - meta := new(xrp.Meta) + meta := new(ripple.Meta) meta.Name = engine.TableName(tableName) if meta.Name != meta.TableName() { has, err := engine.Get(meta) @@ -482,7 +491,7 @@ func CheckXRPTable(engine *xorm.Engine) error { return nil } - err = syncTable(engine, xrp.TablePrefix, fn) + err = syncTable(engine, ripple.TablePrefix, fn) if err != nil { return err } diff --git a/service/model/xrp/block.go b/service/model/ripple/ledger.go similarity index 86% rename from service/model/xrp/block.go rename to service/model/ripple/ledger.go index b09111b..a01c719 100644 --- a/service/model/xrp/block.go +++ b/service/model/ripple/ledger.go @@ -1,14 +1,15 @@ -package xrp +package ripple import ( "github.com/jdcloud-bds/bds/common/math" ) -type Block struct { +type Ledger struct { ID int64 `xorm:"id bigint autoincr pk"` Accepted int `xorm:"accepted tinyint notnull"` - AccountHash string `xorm:"account_hash varchar(128) notnull index"` + AccountHash string `xorm:"account_hash varchar(128) notnull"` CloseFlags int `xorm:"close_flags tinyint notnull"` + Timestamp int64 `xorm:"timestamp int notnull index"` CloseTime int64 `xorm:"close_time int notnull"` CloseTimeHuman string `xorm:"close_time_human varchar(56) notnull"` CloseTimeResolution int `xorm:"close_time_resolution int notnull"` @@ -18,12 +19,12 @@ type Block struct { LedgerIndex int64 `xorm:"ledger_index bigint unique notnull index"` ParentCloseTime int64 `xorm:"parent_close_time int notnull"` ParentHash string `xorm:"parent_hash varchar(128) notnull"` - SeqNum int64 `xorm:"seq_num bigint notnull index"` + SeqNum int64 `xorm:"seq_num bigint notnull"` TotalCoins math.HexOrDecimal256 `xorm:"total_coins decimal(38,0) null"` TransactionHash string `xorm:"transaction_hash varchar(128) notnull"` TransactionLength int `xorm:"tx_len int notnull"` } -func (t Block) TableName() string { - return tableName("block") +func (t Ledger) TableName() string { + return tableName("ledger") } diff --git a/service/model/ripple/meta.go b/service/model/ripple/meta.go new file mode 100644 index 0000000..8aff968 --- /dev/null +++ b/service/model/ripple/meta.go @@ -0,0 +1,58 @@ +package ripple + +import ( + "fmt" + "time" +) + +const ( + TablePrefix = "ripple" +) + +type Meta struct { + ID int64 `xorm:"id bigint autoincr pk"` + Name string `xorm:"name varchar(255) notnull unique"` + LastID int64 `xorm:"last_id bigint notnull"` + Count int64 `xorm:"count bigint notnull"` + CreatedTime time.Time `xorm:"created_time created notnull"` + UpdatedTime time.Time `xorm:"updated_time updated notnull"` +} + +func (t Meta) TableName() string { + return tableName("meta") +} + +func tableName(s string) string { + if len(TablePrefix) == 0 { + return s + } + return fmt.Sprintf("%s_%s", TablePrefix, s) +} + +type TransactionCommonFields struct { + ID int64 `xorm:"id bigint autoincr pk"` + // The type of transaction. Valid types include: Payment, OfferCreate, OfferCancel, TrustSet, + // AccountSet, SetRegularKey, SignerListSet, EscrowCreate, EscrowFinish, EscrowCancel, + // PaymentChannelCreate, PaymentChannelFund, PaymentChannelClaim, and DepositPreauth. + TransactionType string `xorm:"transaction_type char(32) notnull index"` + Account string `xorm:"account varchar(68) notnull index"` + Hash string `xorm:"hash varchar(128) notnull index"` + LedgerIndex int64 `xorm:"ledger_index bigint index"` + Timestamp int64 `xorm:"timestamp int notnull index"` + Fee int64 `xorm:"fee bigint notnull"` //in drops + Sequence int64 `xorm:"sequence bigint notnull"` + AccountTxnID string `xorm:"account_txn_id varchar(128) null"` + Flags int64 `xorm:"flags bigint null"` + LastLedgerSequence int64 `xorm:"last_ledger_sequence bigint null"` + Memos string `xorm:"memos text null"` + Signers string `xorm:"signers text null"` + SourceTag int64 `xorm:"source_tag bigint null"` + SigningPubKey string `xorm:"signing_pub_key varchar(128) null"` + TxnSignature string `xorm:"txn_signature varchar(256) null"` + + //metadata + TransactionResult string `xorm:"transaction_result varchar(32) null"` + TransactionIndex int `xorm:"transaction_index bigint null"` + AffectedNodes string `xorm:"affected_nodes text null"` + DeliveredAmount string `xorm:"delivered_amount text null"` +} diff --git a/service/model/ripple/transaction_account_set.go b/service/model/ripple/transaction_account_set.go new file mode 100644 index 0000000..58ecf42 --- /dev/null +++ b/service/model/ripple/transaction_account_set.go @@ -0,0 +1,17 @@ +package ripple + +type AccountSet struct { + TransactionCommonFields `xorm:"extends"` + + ClearFlag int64 `xorm:"clear_flags bigint null"` + Domain string `xorm:"domain varchar(1024) null"` + EmailHash string `xorm:"email_hash char(128) null"` + MessageKey string `xorm:"message_key varchar(128) null"` + SetFlag int64 `xorm:"set_flag bigint null"` + TransferRate int64 `xorm:"transfer_rate bigint null"` + TickSize int `xorm:"tick_size int null"` +} + +func (t AccountSet) TableName() string { + return tableName("transaction_account_set") +} diff --git a/service/model/ripple/transaction_deposit_preauth.go b/service/model/ripple/transaction_deposit_preauth.go new file mode 100644 index 0000000..90ee9f4 --- /dev/null +++ b/service/model/ripple/transaction_deposit_preauth.go @@ -0,0 +1,12 @@ +package ripple + +type DepositPreauth struct { + TransactionCommonFields `xorm:"extends"` + + Authorize string `xorm:"authorize varchar(68) null"` + UnAuthorize string `xorm:"un_authorize varchar(68) null"` +} + +func (t DepositPreauth) TableName() string { + return tableName("transaction_deposit_preauth") +} diff --git a/service/model/ripple/transaction_escrow_cancel.go b/service/model/ripple/transaction_escrow_cancel.go new file mode 100644 index 0000000..ce625a3 --- /dev/null +++ b/service/model/ripple/transaction_escrow_cancel.go @@ -0,0 +1,12 @@ +package ripple + +type EscrowCancel struct { + TransactionCommonFields `xorm:"extends"` + + Owner string `xorm:"owner varchar(68) null"` + OfferSequence int64 `xorm:"offer_sequence bigint null"` +} + +func (t EscrowCancel) TableName() string { + return tableName("transaction_escrow_cancel") +} diff --git a/service/model/ripple/transaction_escrow_create.go b/service/model/ripple/transaction_escrow_create.go new file mode 100644 index 0000000..d4864da --- /dev/null +++ b/service/model/ripple/transaction_escrow_create.go @@ -0,0 +1,16 @@ +package ripple + +type EscrowCreate struct { + TransactionCommonFields `xorm:"extends"` + + Amount string `xorm:"amount text notnull"` + Destination string `xorm:"destination varchar(68) notnull"` + CancelAfter int64 `xorm:"cancel_after bigint null"` + FinishAfter int64 `xorm:"finish_after bigint null"` + Condition string `xorm:"condition varchar(512) null"` + DestinationTag int64 `xorm:"destination_tag bigint null"` +} + +func (t EscrowCreate) TableName() string { + return tableName("transaction_escrow_create") +} diff --git a/service/model/ripple/transaction_escrow_finish.go b/service/model/ripple/transaction_escrow_finish.go new file mode 100644 index 0000000..8565b1a --- /dev/null +++ b/service/model/ripple/transaction_escrow_finish.go @@ -0,0 +1,14 @@ +package ripple + +type EscrowFinish struct { + TransactionCommonFields `xorm:"extends"` + + Owner string `xorm:"owner varchar(68) null"` + OfferSequence int64 `xorm:"offer_sequence bigint null"` + Condition string `xorm:"condition varchar(512) null"` + Fulfillment string `xorm:"fulfillment varchar(512) null"` +} + +func (t EscrowFinish) TableName() string { + return tableName("transaction_escrow_finish") +} diff --git a/service/model/ripple/transaction_offer_cancel.go b/service/model/ripple/transaction_offer_cancel.go new file mode 100644 index 0000000..90af089 --- /dev/null +++ b/service/model/ripple/transaction_offer_cancel.go @@ -0,0 +1,11 @@ +package ripple + +type OfferCancel struct { + TransactionCommonFields `xorm:"extends"` + + OfferSequence int64 `xorm:"offer_sequence bigint notnull"` +} + +func (t OfferCancel) TableName() string { + return tableName("transaction_offer_cancel") +} diff --git a/service/model/ripple/transaction_offer_create.go b/service/model/ripple/transaction_offer_create.go new file mode 100644 index 0000000..acbcc0f --- /dev/null +++ b/service/model/ripple/transaction_offer_create.go @@ -0,0 +1,14 @@ +package ripple + +type OfferCreate struct { + TransactionCommonFields `xorm:"extends"` + + Expiration int64 `xorm:"expiration int null"` + OfferSequence int64 `xorm:"offer_sequence bigint null"` + TakerGets string `xorm:"taker_gets text notnull"` + TakerPays string `xorm:"taker_pays text notnull"` +} + +func (t OfferCreate) TableName() string { + return tableName("transaction_offer_create") +} diff --git a/service/model/ripple/transaction_payment.go b/service/model/ripple/transaction_payment.go new file mode 100644 index 0000000..ca574bf --- /dev/null +++ b/service/model/ripple/transaction_payment.go @@ -0,0 +1,17 @@ +package ripple + +type Payment struct { + TransactionCommonFields `xorm:"extends"` + + Amount string `xorm:"amount text notnull"` + Destination string `xorm:"destination varchar(68) notnull"` + DestinationTag int64 `xorm:"destination_tag bigint null"` + InvoiceID string `xorm:"invoice_id varchar(128) null"` + Paths string `xorm:"paths text null"` + SendMax string `xorm:"send_max text null"` + DeliverMin string `xorm:"deliver_min text null"` +} + +func (t Payment) TableName() string { + return tableName("transaction_payment") +} diff --git a/service/model/ripple/transaction_payment_channel_claim.go b/service/model/ripple/transaction_payment_channel_claim.go new file mode 100644 index 0000000..cf0c239 --- /dev/null +++ b/service/model/ripple/transaction_payment_channel_claim.go @@ -0,0 +1,15 @@ +package ripple + +type PaymentChannelClaim struct { + TransactionCommonFields `xorm:"extends"` + + Channel string `xorm:"channel char(64) notnull"` + Amount string `xorm:"amount text null"` + Balance string `xorm:"balance text null"` + PublicKey string `xorm:"public_key char(66) null"` + Signature string `xorm:"signature text null"` +} + +func (t PaymentChannelClaim) TableName() string { + return tableName("transaction_payment_channel_claim") +} diff --git a/service/model/ripple/transaction_payment_channel_create.go b/service/model/ripple/transaction_payment_channel_create.go new file mode 100644 index 0000000..daa3240 --- /dev/null +++ b/service/model/ripple/transaction_payment_channel_create.go @@ -0,0 +1,16 @@ +package ripple + +type PaymentChannelCreate struct { + TransactionCommonFields `xorm:"extends"` + + Amount string `xorm:"amount text notnull"` + Destination string `xorm:"destination varchar(68) notnull"` + SettleDelay int64 `xorm:"settleDelay bigint notnull"` + PublicKey string `xorm:"public_key char(66) notnull"` + CancelAfter int64 `xorm:"cancel_after bigint null"` + DestinationTag int64 `xorm:"destination_tag bigint null"` +} + +func (t PaymentChannelCreate) TableName() string { + return tableName("transaction_payment_channel_create") +} diff --git a/service/model/ripple/transaction_payment_channel_fund.go b/service/model/ripple/transaction_payment_channel_fund.go new file mode 100644 index 0000000..1b9f7f0 --- /dev/null +++ b/service/model/ripple/transaction_payment_channel_fund.go @@ -0,0 +1,13 @@ +package ripple + +type PaymentChannelFund struct { + TransactionCommonFields `xorm:"extends"` + + Channel string `xorm:"channel char(64) notnull"` + Amount string `xorm:"amount text notnull"` + Expiration int64 `xorm:"expiration int null"` +} + +func (t PaymentChannelFund) TableName() string { + return tableName("transaction_payment_channel_fund") +} diff --git a/service/model/ripple/transaction_set_regular_key.go b/service/model/ripple/transaction_set_regular_key.go new file mode 100644 index 0000000..f1c0479 --- /dev/null +++ b/service/model/ripple/transaction_set_regular_key.go @@ -0,0 +1,11 @@ +package ripple + +type SetRegularKey struct { + TransactionCommonFields `xorm:"extends"` + + RegularKey string `xorm:"regular_key varchar(68) null"` +} + +func (t SetRegularKey) TableName() string { + return tableName("transaction_set_regular_key") +} diff --git a/service/model/ripple/transaction_signer_list_set.go b/service/model/ripple/transaction_signer_list_set.go new file mode 100644 index 0000000..8bfae1b --- /dev/null +++ b/service/model/ripple/transaction_signer_list_set.go @@ -0,0 +1,12 @@ +package ripple + +type SignerListSet struct { + TransactionCommonFields `xorm:"extends"` + + SignerQuorum int64 `xorm:"signer_quorum bigint notnull"` + SignerEntries string `xorm:"signer_entries text null"` +} + +func (t SignerListSet) TableName() string { + return tableName("transaction_set_regular_key") +} diff --git a/service/model/ripple/transaction_trust_set.go b/service/model/ripple/transaction_trust_set.go new file mode 100644 index 0000000..431938f --- /dev/null +++ b/service/model/ripple/transaction_trust_set.go @@ -0,0 +1,13 @@ +package ripple + +type TrustSet struct { + TransactionCommonFields `xorm:"extends"` + + LimitAmount string `xorm:"limit_amount text notnull"` + QualityIn int64 `xorm:"quality_in bigint null"` + QualityOut int64 `xorm:"quality_out bigint null"` +} + +func (t TrustSet) TableName() string { + return tableName("transaction_trust_set") +} diff --git a/service/model/xrp/account.go b/service/model/xrp/account.go deleted file mode 100644 index ba954de..0000000 --- a/service/model/xrp/account.go +++ /dev/null @@ -1,22 +0,0 @@ -package xrp - -import ( - "github.com/jdcloud-bds/bds/common/math" -) - -type Account struct { - ID int64 `xorm:"id bigint autoincr pk"` - Address string `xorm:"address char(34) notnull index"` //账户地址 - SignerWeight int `xorm:"signer_weight int null default '0'"` - SignerEntrie string `xorm:"signer_entrie char(34) notnull index"` - - Type int `xorm:"type tinyint notnull default '0'"` //账户类型 - Balance math.HexOrDecimal256 `xorm:"balance decimal(38,0) notnull default '0'"` //账户余额 - Creator string `xorm:"creator char(40) notnull default '' "` - BirthTimestamp int64 `xorm:"birth_timestamp int notnull default '0' "` //账户第一次出现的时间 - LastActiveTimestamp int64 `xorm:"last_active_timestamp int notnull default '0'"` //账户上次活跃时间 -} - -func (t Account) TableName() string { - return tableName("account") -} diff --git a/service/model/xrp/amount.go b/service/model/xrp/amount.go deleted file mode 100644 index c480389..0000000 --- a/service/model/xrp/amount.go +++ /dev/null @@ -1,20 +0,0 @@ -package xrp - -type Amount struct { - ID int64 `xorm:"id bigint autoincr pk"` - ParentHash string `xorm:"parent_hash char(64) notnull index"` - LedgerIndex int64 `xorm:"ledger_index int index"` - CloseTime int64 `xorm:"close_time int notnull"` - Currency string `xorm:"currency char(8) notnull index"` - Value string `xorm:"value decimal(38,4) notnull default '0'"` - Issuer string `xorm:"issuer char(34) notnull index"` - AmountType int `xorm:"amount_type int notnull"` - //1:Payment Amount 2:Payment SendMax 3:Payment DeliverMin - //4: OfferCreate TakerGets 5:OfferCreate TakerPay - //6: TrustSet LimitAmount - -} - -func (t Amount) TableName() string { - return tableName("amount") -} diff --git a/service/model/xrp/meta.go b/service/model/xrp/meta.go deleted file mode 100644 index 67afd0d..0000000 --- a/service/model/xrp/meta.go +++ /dev/null @@ -1,30 +0,0 @@ -package xrp - -import ( - "fmt" - "time" -) - -const ( - TablePrefix = "xrp" -) - -type Meta struct { - ID int64 `xorm:"id bigint autoincr pk"` - Name string `xorm:"name varchar(255) notnull unique"` - LastID int64 `xorm:"last_id bigint notnull"` - Count int64 `xorm:"count bigint notnull"` - CreatedTime time.Time `xorm:"created_time created notnull"` - UpdatedTime time.Time `xorm:"updated_time updated notnull"` -} - -func (t Meta) TableName() string { - return tableName("meta") -} - -func tableName(s string) string { - if len(TablePrefix) == 0 { - return s - } - return fmt.Sprintf("%s_%s", TablePrefix, s) -} diff --git a/service/model/xrp/path.go b/service/model/xrp/path.go deleted file mode 100644 index 2f23cf2..0000000 --- a/service/model/xrp/path.go +++ /dev/null @@ -1,18 +0,0 @@ -package xrp - -type Path struct { - ID int64 `xorm:"id bigint autoincr pk"` - ParentHash string `xorm:"parent_hash char(64) notnull index"` - LedgerIndex int64 `xorm:"ledger_index int index"` - CloseTime int64 `xorm:"close_time int notnull"` - Currency string `xorm:"currency char(8) notnull index"` - Issuer string `xorm:"issuer char(34) notnull index"` - Type int64 `xorm:"type int null"` - Account string `xorm:"account char(34) null index"` - InTxIndex int64 `xorm:"in_tx_index int"` - InPathIndex int64 `xorm:"in_path_index int"` -} - -func (t Path) TableName() string { - return tableName("path") -} diff --git a/service/model/xrp/transaction.go b/service/model/xrp/transaction.go deleted file mode 100644 index 42456a0..0000000 --- a/service/model/xrp/transaction.go +++ /dev/null @@ -1,92 +0,0 @@ -package xrp - -type Transaction struct { - ID int64 `xorm:"id bigint autoincr pk"` - Account string `xorm:"account varchar(68) notnull index"` - TransactionType string `xorm:"transaction_type char(30) notnull index"` - //The type of transaction. Valid types include: Payment, OfferCreate, OfferCancel, TrustSet, - // AccountSet, SetRegularKey, SignerListSet, EscrowCreate, EscrowFinish, EscrowCancel, - // PaymentChannelCreate, PaymentChannelFund, PaymentChannelClaim, and DepositPreauth. - Fee int64 `xorm:"fee bigint notnull"` //in drops - Sequence int64 `xorm:"sequence bigint notnull"` - AccountTxnID string `xorm:"account_txn_id varchar(128) null"` - Flags int64 `xorm:"flags bigint null"` - LastLedgerSequence int64 `xorm:"last_ledger_sequence bigint null"` - Memos string `xorm:"memos varchar(1024) null"` - Signers string `xorm:"signers varchar(2048) null"` - SourceTag int64 `xorm:"source_tag bigint null"` - SigningPubKey string `xorm:"signing_pub_key varchar(132) null"` - TxnSignature string `xorm:"txn_signature varchar(380) null"` - - Hash string `xorm:"hash varchar(128) notnull index"` - LedgerIndex int64 `xorm:"ledger_index bigint index"` - AffectedNodesLen int `xorm:"affected_nodes_len bigint null"` - TransactionResult string `xorm:"transaction_result varchar(30) null"` - TransactionIndex int `xorm:"transaction_index bigint null"` - Validated int `xorm:"validated tinyint null"` - Date int64 `xorm:"date bigint null"` - - //additional - CloseTime int64 `xorm:"close_time bigint notnull"` - - //Payment - Amount int64 `xorm:"amount bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount - Destination string `xorm:"destination varchar(68) null"` - DestinationTag int64 `xorm:"destination_tag bigint null"` - InvoiceID string `xorm:"invoice_id varchar(128) null"` - //Pathes fk - PathesLen int `xorm:"pathes_len bigint null"` - SendMax int64 `xorm:"send_max bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount - DeliverMin int64 `xorm:"deliver_min bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount - - //OfferCreate OfferCancel - Expiration int64 `xorm:"expiration bigint null"` - OfferSequence int `xorm:"offer_sequence bigint null"` - TakerGets int64 `xorm:"taker_gets bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount - TakerPays int64 `xorm:"taker_pays bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount - - //TrustSet - LimitAmount int64 `xorm:"limit_amount bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount - QualityIn int64 `xorm:"quality_in bigint null"` - QualityOut int64 `xorm:"quality_out bigint null"` - - //AccountSet - ClearFlag int `xorm:"clear_flag bigint null"` - Domain string `xorm:"domain varchar(512) null"` - EmailHash string `xorm:"email_hash char(64) null"` - MessageKey string `xorm:"message_key varchar(68) null"` - SetFlag int `xorm:"set_flag bigint null"` - TransferRate int `xorm:"transfer_rate bigint null"` - TickSize int `xorm:"tick_size bigint null"` - //WalletLocator WalletSize: not used - - //SetRegularKey - RegularKey string `xorm:"regular_key varchar(68) null"` - - //SignerListSet - SignerQuorum int `xorm:"signer_quorum bigint null"` - - //EscrowCreate - CancelAfter int64 `xorm:"cancel_after bigint null"` - FinishAfter int64 `xorm:"finish_after bigint null"` - Condition string `xorm:"condition varchar(512) null"` - - //EscrowFinish - Owner string `xorm:"owner varchar(68) null"` - Fulfillment string `xorm:"fulfillment varchar(512) null"` - //EscrowCancel - //PaymentChannelCreate - SettleDelay int64 `xorm:"settle_delay bigint null"` - PublicKey string `xorm:"public_key char(66) null"` - //PaymentChannelFund - Channel string `xorm:"channel char(64) null"` - //PaymentChannelClaim - Balance int64 `xorm:"balance bigint null"` - //DepositPreauth - Authorize string `xorm:"authorize varchar(68) null"` - UnAuthorize string `xorm:"un_authorize varchar(68) null"` -} - -func (t Transaction) TableName() string { - return tableName("transaction") -} diff --git a/service/model/xrp/transaction_affected_nodes.go b/service/model/xrp/transaction_affected_nodes.go deleted file mode 100644 index 139ac53..0000000 --- a/service/model/xrp/transaction_affected_nodes.go +++ /dev/null @@ -1,19 +0,0 @@ -package xrp - -type AffectedNodes struct { - ID int64 `xorm:"id bigint autoincr pk"` - ParentHash string `xorm:"parent_hash char(64) notnull index"` - LedgerIndex int64 `xorm:"ledger_index int index"` - CloseTime int64 `xorm:"close_time int notnull"` - - NodeType string `xorm:"node_type char(20) notnull index"` - LedgerEntryType string `xorm:"ledger_entry_type char(20) null index"` - NodeLedgerIndex string `xorm:"hash char(64) not null"` - PreviousTxnID string `xorm:"previous_txn_id char(64) null"` - PreviousTxnLgrSeq int64 `xorm:"previous_txn_lgr_seq int null"` - FullJsonStr string `xorm:"full_json_str varchar(1024) null"` -} - -func (t AffectedNodes) TableName() string { - return tableName("transaction_affected_nodes") -} diff --git a/splitter/ripple/handler_rpc.go b/splitter/ripple/handler_rpc.go new file mode 100644 index 0000000..bc2cc0a --- /dev/null +++ b/splitter/ripple/handler_rpc.go @@ -0,0 +1,79 @@ +package ripple + +import ( + "errors" + "github.com/jdcloud-bds/bds/common/json" + "github.com/jdcloud-bds/bds/common/jsonrpc" + "github.com/jdcloud-bds/bds/common/log" + "strconv" + "strings" +) + +type rpcHandler struct { + client *jsonrpc.Client +} + +func newRPCHandler(c *jsonrpc.Client) (*rpcHandler, error) { + h := new(rpcHandler) + h.client = c + return h, nil +} + +type CompleteLedgers struct { + startLedger int64 + endLedger int64 +} + +func (h *rpcHandler) GetCompleteLedgers() (map[int]*CompleteLedgers, error) { + totalCompleteLedgers := make(map[int]*CompleteLedgers, 0) + + res, err := h.client.CallXRP("server_info") + if err != nil { + return nil, err + } + data := string(res) + clStr := json.Get(data, "result.info.complete_ledgers").String() + //clStr demo:"47025320,47025422-47025425,47025527-47025528,47025629-47025661,47025763,47025864-47025877" + log.Info("splitter ripple: get completed ledgers: %s\n\n", clStr) + if strings.ToLower(clStr) == "empty" { + return nil, nil + } + + clList := strings.Split(clStr, ",") + for i, v := range clList { + cl := new(CompleteLedgers) + index := strings.Index(v, "-") + if index > 0 { + cl.startLedger, _ = strconv.ParseInt(v[:index], 10, 64) + cl.endLedger, _ = strconv.ParseInt(v[index+1:], 10, 64) + } else { + cl.startLedger, _ = strconv.ParseInt(v, 10, 64) + cl.endLedger, _ = strconv.ParseInt(v, 10, 64) + } + totalCompleteLedgers[i] = cl + } + return totalCompleteLedgers, nil +} + +func (h *rpcHandler) SendBatchLedger(start, end int64) error { + defer stats.Add(MetricRPCCall, 1) + params := make(map[string]int64, 0) + params[ParamStartLedgerIndex] = start + params[ParamEndLedgerIndex] = end + + result, err := h.client.CallXRP("send_batch_ledger", params) + log.Debug("splitter ripple: send batch ledger result: %s", string(result)) + if err != nil { + return err + } + status := json.Get(string(result), "result.status").String() + errorMessage := json.Get(string(result), "result.error_message").String() + if status == "error" { + log.Error("splitter ripple: send batch ledger result error : %s", errorMessage) + log.DetailDebug("splitter ripple: send batch ledger result error : %s", errorMessage) + return errors.New("result status is error") + } + return nil +} + + diff --git a/splitter/ripple/handler_rpc_test.go b/splitter/ripple/handler_rpc_test.go new file mode 100644 index 0000000..dd9fb0a --- /dev/null +++ b/splitter/ripple/handler_rpc_test.go @@ -0,0 +1,33 @@ +package ripple + +import ( + "github.com/jdcloud-bds/bds/common/httputils" + "github.com/jdcloud-bds/bds/common/jsonrpc" + "testing" +) + +func TestSendBatchLedger(t *testing.T) { + httpClient := httputils.NewRestClientWithAuthentication(nil) + remoteHandler, err := newRPCHandler(jsonrpc.New(httpClient, "http://116.196.114.8:5555")) + if err != nil { + t.Fatal(err) + } + err = remoteHandler.SendBatchLedger(50588462, 50588462) + if err != nil { + t.Fatal(err) + } + t.Log("over") +} + +func TestGetCompleteLedgers(t *testing.T) { + httpClient := httputils.NewRestClientWithAuthentication(nil) + remoteHandler, err := newRPCHandler(jsonrpc.New(httpClient, "http://116.196.114.8:5555")) + if err != nil { + t.Fatal(err) + } + result, err := remoteHandler.GetCompleteLedgers() + if err != nil { + t.Fatal(err) + } + t.Logf("complete ledgers result : %v", result) +} diff --git a/splitter/ripple/job.go b/splitter/ripple/job.go new file mode 100644 index 0000000..526f4a6 --- /dev/null +++ b/splitter/ripple/job.go @@ -0,0 +1,178 @@ +package ripple + +import ( + "fmt" + "github.com/jdcloud-bds/bds/common/log" + "github.com/jdcloud-bds/bds/service" + model "github.com/jdcloud-bds/bds/service/model/xrp" + "strconv" + "time" +) + +type WorkerJob interface { + Run() + Name() string + run() error +} + +type updateMetaDataJob struct { + splitter *XRPSplitter + name string +} + +func newUpdateMetaDataJob(splitter *XRPSplitter) *updateMetaDataJob { + j := new(updateMetaDataJob) + j.splitter = splitter + j.name = "update meta data" + return j +} + +func (j *updateMetaDataJob) Run() { + _ = j.run() +} + +func (j *updateMetaDataJob) Name() string { + return j.name +} + +func (j *updateMetaDataJob) run() error { + startTime := time.Now() + db := service.NewDatabase(j.splitter.cfg.Engine) + metas := make([]*model.Meta, 0) + err := db.Find(&metas) + if err != nil { + log.Error("ripple job : '%s' get table list from meta error", j.name) + return err + } + + for _, meta := range metas { + cond := new(model.Meta) + cond.Name = meta.Name + data := new(model.Meta) + + var countSql string + if j.splitter.cfg.Engine.DriverName() == "mssql" { + countSql = fmt.Sprintf("SELECT b.rows AS count FROM sysobjects a INNER JOIN sysindexes b ON a.id = b.id WHERE a.type = 'u' AND b.indid in (0,1) AND a.name='%s'", meta.Name) + } else { + countSql = fmt.Sprintf("SELECT COUNT(1) FROM `%s`", meta.Name) + } + result, err := db.QueryString(countSql) + if err != nil { + log.Error("ripple job : %s get table %s count from meta error", j.name, meta.Name) + log.DetailError(err) + continue + } + if len(result) == 0 { + continue + } + count, _ := strconv.ParseInt(result[0]["count"], 10, 64) + + sql := db.Table(meta.Name).Cols("id").Desc("id").Limit(1, 0) + result, err = sql.QueryString() + if err != nil { + log.Error("ripple job : '%s' get table %s id from meta error", j.name, meta.Name) + log.DetailError(err) + continue + } + for _, v := range result { + id, _ := strconv.ParseInt(v["id"], 10, 64) + data.LastID = id + data.Count = count + _, err = db.Update(data, cond) + if err != nil { + log.Error("ripple job : '%s' update table %s meta error", j.name, meta.Name) + log.DetailError(err) + continue + } + + } + } + stats.Add(MetricCronWorkerJobUpdateMetaData, 1) + elapsedTime := time.Now().Sub(startTime) + log.Debug("ripple job : '%s' elapsed time %s", j.name, elapsedTime.String()) + return nil +} + +type getBatchLedgerJob struct { + splitter *XRPSplitter + name string +} + +func newGetBatchLedgerJob(splitter *XRPSplitter) *getBatchLedgerJob { + j := new(getBatchLedgerJob) + j.splitter = splitter + j.name = "'get batch ledger'" + return j +} + +func (j *getBatchLedgerJob) Run() { + _ = j.run() +} + +func (j *getBatchLedgerJob) Name() string { + return j.name +} + +func (j *getBatchLedgerJob) run() error { + startTime := time.Now() + db := service.NewDatabase(j.splitter.cfg.Engine) + totalCompleteLedgers, err := j.splitter.remoteHandler.GetCompleteLedgers() + if err != nil { + log.Error("ripple job : %s get closed ledgers error", j.name) + log.DetailError("ripple job error : %s", err.Error()) + return err + } + batchSize := 1000 + + ledgerList := make(map[int64]bool, 0) + sql := "select ledger_index from ripple_ledger order by ledger_index asc" + result, err := db.QueryString(sql) + if err != nil { + log.Error("ripple job : get ledger_index from database error") + log.DetailError(err) + return err + } + for _, v := range result { + ledgerIndex, _ := strconv.ParseInt(v["ledger_index"], 10, 64) + ledgerList[ledgerIndex] = true + } + for _, cl := range totalCompleteLedgers { + missedLedger := make(map[int64]bool, 0) + i := 0 + start := int64(0) + end := int64(0) + flag := false + for k := cl.startLedger; k <= cl.endLedger; k++ { + if _, ok := ledgerList[k]; !ok { + if !flag { + start = k + end = k + flag = true + } else { + end = k + } + missedLedger[k] = true + i++ + } else if flag { + break + } + + if i >= batchSize { + break + } + } + if len(missedLedger) > 0 && end >= start { + log.Info("splitter ripple: send batch ledger from %d to %d.", start, end) + err = j.splitter.remoteHandler.SendBatchLedger(start, end) + if err != nil { + log.Error("splitter ripple: send batch ledger error: %s", err.Error()) + log.DetailError(err) + return err + } + } + } + + elapsedTime := time.Now().Sub(startTime) + log.Debug("ripple job : '%s' elapsed time %s", j.name, elapsedTime.String()) + return nil +} diff --git a/splitter/ripple/meta.go b/splitter/ripple/meta.go new file mode 100644 index 0000000..72d0e74 --- /dev/null +++ b/splitter/ripple/meta.go @@ -0,0 +1,64 @@ +package ripple + +import ( + "github.com/jdcloud-bds/bds/common/metric" + model "github.com/jdcloud-bds/bds/service/model/ripple" + "math/big" +) + +const ( + MetricReceiveMessages = "receive_messages" + MetricParseDataError = "parse_data_error" + MetricVaildationSuccess = "validation_success" + MetricVaildationError = "validation_error" + MetricDatabaseRollback = "database_rollback" + MetricDatabaseCommit = "database_commit" + MetricCronWorkerJob = "cron_worker_job" + MetricCronWorkerJobUpdateMetaData = "cron_worker_job_update_meta_data" + MetricCronWorkerJobGetBatchLedger = "cron_worker_job_get_batch_ledger" + MetricRPCCall = "rpc_call" + + ParamStartLedgerIndex = "start_ledger_index" + ParamEndLedgerIndex = "end_ledger_index" +) + +const ( + AccountSet = "AccountSet" + DepositPreauth = "DepositPreauth" + EscrowCancel = "EscrowCancel" + EscrowCreate = "EscrowCreate" + EscrowFinish = "EscrowFinish" + OfferCancel = "OfferCancel" + OfferCreate = "OfferCreate" + Payment = "Payment" + PaymentChannelClaim = "PaymentChannelClaim" + PaymentChannelCreate = "PaymentChannelCreate" + PaymentChannelFund = "PaymentChannelFund" + SetRegularKey = "SetRegularKey" + SignerListSet = "SignerListSet" + TrustSet = "TrustSet" +) + +var ( + stats = metric.NewMap("ripple") + maxBigNumber, _ = new(big.Int).SetString("100000000000000000000000000000000000000", 10) + defaultBigNumber, _ = new(big.Int).SetString("-1", 10) +) + +type XRPLedgerData struct { + Ledger *model.Ledger + AccountSets []*model.AccountSet + DepositPreauths []*model.DepositPreauth + EscrowCancels []*model.EscrowCancel + EscrowCreates []*model.EscrowCreate + EscrowFinishes []*model.EscrowFinish + OfferCancels []*model.OfferCancel + OfferCreates []*model.OfferCreate + Payments []*model.Payment + PaymentChannelClaims []*model.PaymentChannelClaim + PaymentChannelCreates []*model.PaymentChannelCreate + PaymentChannelFunds []*model.PaymentChannelFund + SetRegularKeys []*model.SetRegularKey + SignerListSets []*model.SignerListSet + TrustSets []*model.TrustSet +} diff --git a/splitter/ripple/method.go b/splitter/ripple/method.go new file mode 100644 index 0000000..a6de003 --- /dev/null +++ b/splitter/ripple/method.go @@ -0,0 +1,262 @@ +package ripple + +import ( + "github.com/jdcloud-bds/bds/common/json" + "github.com/jdcloud-bds/bds/common/log" + "github.com/jdcloud-bds/bds/common/math" + model "github.com/jdcloud-bds/bds/service/model/ripple" + "time" +) + +func parseTransactionCommonFields(commonFields *model.TransactionCommonFields, input string, index, timestamp int64) { + commonFields.TransactionType = json.Get(input, "TransactionType").String() + commonFields.ID = 0 + commonFields.Account = json.Get(input, "Account").String() + commonFields.Hash = json.Get(input, "hash").String() + commonFields.LedgerIndex = index + commonFields.Timestamp = timestamp + commonFields.Fee = json.Get(input, "Fee").Int() + commonFields.Sequence = json.Get(input, "Sequence").Int() + commonFields.AccountTxnID = json.Get(input, "AccountTxnID").String() + commonFields.Flags = json.Get(input, "Flags").Int() + commonFields.LastLedgerSequence = json.Get(input, "LastLedgerSequence").Int() + commonFields.Memos = json.Get(input, "Memos").String() + commonFields.Signers = json.Get(input, "Signers").String() + commonFields.SourceTag = json.Get(input, "SourceTag").Int() + commonFields.SigningPubKey = json.Get(input, "SigningPubKey").String() + commonFields.TxnSignature = json.Get(input, "TxnSignature").String() + + commonFields.TransactionResult = json.Get(input, "metaData.TransactionResult").String() + commonFields.TransactionIndex = int(json.Get(input, "metaData.TransactionIndex").Int()) + commonFields.AffectedNodes = json.Get(input, "metaData.AffectedNodes").String() + commonFields.DeliveredAmount = json.Get(input, "metaData.DeliveredAmount").String() +} + +func ParseLedger(data string) (*XRPLedgerData, error) { + startTime := time.Now() + var err error + + b := new(XRPLedgerData) + b.Ledger = new(model.Ledger) + b.AccountSets = make([]*model.AccountSet, 0) + b.DepositPreauths = make([]*model.DepositPreauth, 0) + b.EscrowCancels = make([]*model.EscrowCancel, 0) + b.EscrowCreates = make([]*model.EscrowCreate, 0) + b.EscrowFinishes = make([]*model.EscrowFinish, 0) + b.OfferCancels = make([]*model.OfferCancel, 0) + b.OfferCreates = make([]*model.OfferCreate, 0) + b.Payments = make([]*model.Payment, 0) + b.PaymentChannelClaims = make([]*model.PaymentChannelClaim, 0) + b.PaymentChannelCreates = make([]*model.PaymentChannelCreate, 0) + b.PaymentChannelFunds = make([]*model.PaymentChannelFund, 0) + b.SetRegularKeys = make([]*model.SetRegularKey, 0) + b.SignerListSets = make([]*model.SignerListSet, 0) + b.TrustSets = make([]*model.TrustSet, 0) + + //Ledger + b.Ledger.Accepted = int(json.Get(data, "accepted").Int()) + b.Ledger.AccountHash = json.Get(data, "account_hash").String() + b.Ledger.CloseFlags = int(json.Get(data, "close_flags").Int()) + b.Ledger.CloseTime = json.Get(data, "close_time").Int() + b.Ledger.Timestamp = json.Get(data, "close_time").Int() + 946656000 + b.Ledger.CloseTimeHuman = json.Get(data, "close_time_human").String() + b.Ledger.CloseTimeResolution = int(json.Get(data, "close_time_resolution").Int()) + b.Ledger.Closed = int(json.Get(data, "closed").Int()) + b.Ledger.Hash = json.Get(data, "hash").String() + b.Ledger.LedgerHash = json.Get(data, "ledger_hash").String() + b.Ledger.LedgerIndex = json.Get(data, "ledger_index").Int() + b.Ledger.ParentCloseTime = json.Get(data, "parent_close_time").Int() + b.Ledger.ParentHash = json.Get(data, "parent_hash").String() + b.Ledger.SeqNum = json.Get(data, "seqNum").Int() + totalCoins := json.Get(data, "total_coins").String() + b.Ledger.TotalCoins, err = parseBigInt(totalCoins) + if err != nil { + log.Error("splitter ripple: Ledger %d TotalCoins '%s' parse error", b.Ledger.LedgerIndex, totalCoins) + return nil, err + } + b.Ledger.TransactionHash = json.Get(data, "transaction_hash").String() + + transactionList := json.Get(data, "transactions").Array() + for _, transaction := range transactionList { + + transactionType := json.Get(transaction.String(), "TransactionType").String() + + switch transactionType { + case AccountSet: + accountSet := new(model.AccountSet) + parseTransactionCommonFields(&accountSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + accountSet.ClearFlag = json.Get(transaction.String(), "ClearFlag").Int() + accountSet.Domain = json.Get(transaction.String(), "Domain").String() + accountSet.EmailHash = json.Get(transaction.String(), "EmailHash").String() + accountSet.MessageKey = json.Get(transaction.String(), "MessageKey").String() + accountSet.SetFlag = json.Get(transaction.String(), "SetFlag").Int() + accountSet.TransferRate = json.Get(transaction.String(), "TransferRate").Int() + + b.AccountSets = append(b.AccountSets, accountSet) + log.Debug("splitter ripple: %s transaction %s .", transactionType, accountSet.Hash) + + case DepositPreauth: + depositPreauth := new(model.DepositPreauth) + parseTransactionCommonFields(&depositPreauth.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + depositPreauth.Authorize = json.Get(transaction.String(), "Authorize").String() + depositPreauth.UnAuthorize = json.Get(transaction.String(), "Unauthorize").String() + + b.DepositPreauths = append(b.DepositPreauths, depositPreauth) + log.Debug("splitter ripple: %s transaction %s .", transactionType, depositPreauth.Hash) + + case EscrowCancel: + escrowCancel := new(model.EscrowCancel) + parseTransactionCommonFields(&escrowCancel.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + escrowCancel.Owner = json.Get(transaction.String(), "Owner").String() + escrowCancel.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int() + + b.EscrowCancels = append(b.EscrowCancels, escrowCancel) + log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowCancel.Hash) + + case EscrowCreate: + escrowCreate := new(model.EscrowCreate) + parseTransactionCommonFields(&escrowCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + escrowCreate.Amount = json.Get(transaction.String(), "Amount").String() + escrowCreate.Destination = json.Get(transaction.String(), "Destination").String() + escrowCreate.CancelAfter = json.Get(transaction.String(), "CancelAfter").Int() + escrowCreate.FinishAfter = json.Get(transaction.String(), "FinishAfter").Int() + escrowCreate.Condition = json.Get(transaction.String(), "Condition").String() + escrowCreate.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int() + + b.EscrowCreates = append(b.EscrowCreates, escrowCreate) + log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowCreate.Hash) + + case EscrowFinish: + escrowFinish := new(model.EscrowFinish) + parseTransactionCommonFields(&escrowFinish.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + escrowFinish.Owner = json.Get(transaction.String(), "Owner").String() + escrowFinish.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int() + escrowFinish.Condition = json.Get(transaction.String(), "Condition").String() + escrowFinish.Fulfillment = json.Get(transaction.String(), "Fulfillment").String() + + b.EscrowFinishes = append(b.EscrowFinishes, escrowFinish) + log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowFinish.Hash) + + case OfferCancel: + offerCancel := new(model.OfferCancel) + parseTransactionCommonFields(&offerCancel.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + offerCancel.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int() + + b.OfferCancels = append(b.OfferCancels, offerCancel) + log.Debug("splitter ripple: %s transaction %s .", transactionType, offerCancel.Hash) + + case OfferCreate: + offerCreate := new(model.OfferCreate) + parseTransactionCommonFields(&offerCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + offerCreate.Expiration = json.Get(transaction.String(), "Expiration").Int() + offerCreate.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int() + offerCreate.TakerGets = json.Get(transaction.String(), "TakerGets").String() + offerCreate.TakerPays = json.Get(transaction.String(), "TakerPays").String() + + b.OfferCreates = append(b.OfferCreates, offerCreate) + log.Debug("splitter ripple: %s transaction %s .", transactionType, offerCreate.Hash) + + case Payment: + payment := new(model.Payment) + parseTransactionCommonFields(&payment.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + payment.Amount = json.Get(transaction.String(), "Amount").String() + payment.Destination = json.Get(transaction.String(), "Destination").String() + payment.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int() + payment.InvoiceID = json.Get(transaction.String(), "InvoiceID").String() + payment.Paths = json.Get(transaction.String(), "Paths").String() + payment.SendMax = json.Get(transaction.String(), "SendMax").String() + payment.DeliverMin = json.Get(transaction.String(), "DeliverMin").String() + + b.Payments = append(b.Payments, payment) + log.Debug("splitter ripple: %s transaction %s .", transactionType, payment.Hash) + + case PaymentChannelClaim: + paymentChannelClaim := new(model.PaymentChannelClaim) + parseTransactionCommonFields(&paymentChannelClaim.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + paymentChannelClaim.Amount = json.Get(transaction.String(), "Amount").String() + paymentChannelClaim.Channel = json.Get(transaction.String(), "Channel").String() + paymentChannelClaim.Balance = json.Get(transaction.String(), "Balance").String() + paymentChannelClaim.PublicKey = json.Get(transaction.String(), "PublicKey").String() + paymentChannelClaim.Signature = json.Get(transaction.String(), "Signature").String() + + b.PaymentChannelClaims = append(b.PaymentChannelClaims, paymentChannelClaim) + log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelClaim.Hash) + + case PaymentChannelCreate: + paymentChannelCreate := new(model.PaymentChannelCreate) + parseTransactionCommonFields(&paymentChannelCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + paymentChannelCreate.Amount = json.Get(transaction.String(), "Amount").String() + paymentChannelCreate.Destination = json.Get(transaction.String(), "Destination").String() + paymentChannelCreate.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int() + paymentChannelCreate.SettleDelay = json.Get(transaction.String(), "SettleDelay").Int() + paymentChannelCreate.PublicKey = json.Get(transaction.String(), "PublicKey").String() + paymentChannelCreate.CancelAfter = json.Get(transaction.String(), "CancelAfter").Int() + + b.PaymentChannelCreates = append(b.PaymentChannelCreates, paymentChannelCreate) + log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelCreate.Hash) + + case PaymentChannelFund: + paymentChannelFund := new(model.PaymentChannelFund) + parseTransactionCommonFields(&paymentChannelFund.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + paymentChannelFund.Amount = json.Get(transaction.String(), "Amount").String() + paymentChannelFund.Channel = json.Get(transaction.String(), "Channel").String() + paymentChannelFund.Expiration = json.Get(transaction.String(), "Expiration").Int() + + b.PaymentChannelFunds = append(b.PaymentChannelFunds, paymentChannelFund) + log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelFund.Hash) + + case SetRegularKey: + setRegularKey := new(model.SetRegularKey) + parseTransactionCommonFields(&setRegularKey.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + setRegularKey.RegularKey = json.Get(transaction.String(), "RegularKey").String() + + b.SetRegularKeys = append(b.SetRegularKeys, setRegularKey) + log.Debug("splitter ripple: %s transaction %s .", transactionType, setRegularKey.Hash) + + case SignerListSet: + signerListSet := new(model.SignerListSet) + parseTransactionCommonFields(&signerListSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + signerListSet.SignerQuorum = json.Get(transaction.String(), "SignerQuorum").Int() + signerListSet.SignerEntries = json.Get(transaction.String(), "SignerEntries").String() + + b.SignerListSets = append(b.SignerListSets, signerListSet) + log.Debug("splitter ripple: %s transaction %s .", transactionType, signerListSet.Hash) + + case TrustSet: + trustSet := new(model.TrustSet) + parseTransactionCommonFields(&trustSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp) + trustSet.LimitAmount = json.Get(transaction.String(), "LimitAmount").String() + trustSet.QualityIn = json.Get(transaction.String(), "QualityIn").Int() + trustSet.QualityOut = json.Get(transaction.String(), "QualityOut").Int() + + b.TrustSets = append(b.TrustSets, trustSet) + log.Debug("splitter ripple: %s transaction %s .", transactionType, trustSet.Hash) + } + } + + b.Ledger.TransactionLength = len(transactionList) + + elaspedTime := time.Now().Sub(startTime) + log.Debug("splitter ripple: parse ledger %d, txs %d, elasped time %s", b.Ledger.LedgerIndex, b.Ledger.TransactionLength, elaspedTime.String()) + + return b, nil +} + +func parseBigInt(s string) (math.HexOrDecimal256, error) { + var n math.HexOrDecimal256 + if s == "0x" { + s = "0x0" + } + + v, ok := math.ParseBig256(s) + if !ok { + n = math.HexOrDecimal256(*defaultBigNumber) + } else { + if v.Cmp(maxBigNumber) >= 0 { + n = math.HexOrDecimal256(*defaultBigNumber) + } else { + n = math.HexOrDecimal256(*v) + } + } + return n, nil +} diff --git a/splitter/ripple/schema.json b/splitter/ripple/schema.json new file mode 100644 index 0000000..7a73a41 --- /dev/null +++ b/splitter/ripple/schema.json @@ -0,0 +1,2 @@ +{ +} \ No newline at end of file diff --git a/splitter/ripple/splitter.go b/splitter/ripple/splitter.go new file mode 100644 index 0000000..8763fc2 --- /dev/null +++ b/splitter/ripple/splitter.go @@ -0,0 +1,468 @@ +package ripple + +import ( + "fmt" + "github.com/go-xorm/xorm" + "github.com/jdcloud-bds/bds/common/httputils" + "github.com/jdcloud-bds/bds/common/jsonrpc" + "github.com/jdcloud-bds/bds/common/kafka" + "github.com/jdcloud-bds/bds/common/log" + "github.com/jdcloud-bds/bds/service" + model "github.com/jdcloud-bds/bds/service/model/ripple" + "github.com/xeipuuv/gojsonschema" + "strconv" + "strings" + "time" +) + +type SplitterConfig struct { + Engine *xorm.Engine + Consumer *kafka.ConsumerGroup + Topic string + DatabaseEnable bool + MaxBatchBlock int + Endpoint string + User string + Password string + JSONSchemaFile string + JSONSchemaValidationEnable bool + DatabaseWorkerNumber int + DatabaseWorkerBuffer int +} + +type XRPSplitter struct { + cfg *SplitterConfig + remoteHandler *rpcHandler + cronWorker *CronWorker + jsonSchemaLoader gojsonschema.JSONLoader + missedBlockList map[int64]bool + latestSaveDataTimestamp time.Time + latestReceiveMessageTimestamp time.Time + databaseWorkerChan chan *XRPLedgerData + databaseWorkerStopChan chan bool +} + +func NewSplitter(cfg *SplitterConfig) (*XRPSplitter, error) { + var err error + s := new(XRPSplitter) + s.cfg = cfg + s.databaseWorkerChan = make(chan *XRPLedgerData, cfg.DatabaseWorkerBuffer) + s.databaseWorkerStopChan = make(chan bool, 0) + s.missedBlockList = make(map[int64]bool, 0) + httpClient := httputils.NewRestClientWithBasicAuth(s.cfg.User, s.cfg.Password) + s.remoteHandler, err = newRPCHandler(jsonrpc.New(httpClient, s.cfg.Endpoint)) + if err != nil { + log.DetailError(err) + } + + if s.cfg.JSONSchemaValidationEnable { + f := fmt.Sprintf("file://%s", s.cfg.JSONSchemaFile) + s.jsonSchemaLoader = gojsonschema.NewReferenceLoader(f) + } + + s.cronWorker = NewCronWorker(s) + //err = s.cronWorker.Prepare() + //if err != nil { + // log.DetailError(err) + // return nil, err + //} + + return s, nil +} + +func (s *XRPSplitter) Start() { + err := s.cronWorker.Start() + if err != nil { + log.Error("splitter ripple: cron worker start error") + log.DetailError(err) + return + } + + err = s.cfg.Consumer.Start(s.cfg.Topic) + if err != nil { + log.Error("splitter ripple: consumer start error") + log.DetailError(err) + return + } + + for i := 0; i < s.cfg.DatabaseWorkerNumber; i++ { + go s.databaseWorker(i) + } + + log.Debug("splitter ripple: consumer start topic %s", s.cfg.Topic) + log.Debug("splitter ripple: database enable is %v", s.cfg.DatabaseEnable) + + for { + select { + case message := <-s.cfg.Consumer.MessageChannel(): + stats.Add(MetricReceiveMessages, 1) + s.latestReceiveMessageTimestamp = time.Now() + + START: + if s.cfg.JSONSchemaValidationEnable { + ok, err := s.jsonSchemaValid(string(message.Data)) + if err != nil { + log.Error("splitter ripple: json schema valid error") + } + if !ok { + log.Warn("splitter ripple: json schema valid failed") + } + } + + data, err := ParseLedger(string(message.Data)) + if err != nil { + stats.Add(MetricParseDataError, 1) + log.Error("splitter ripple: ledger parse error, retry after 5s") + log.DetailError(err) + time.Sleep(time.Second * 5) + goto START + } + + exist, err := s.IsExistingLedger(data) + if err != nil { + stats.Add(MetricParseDataError, 1) + log.Error("splitter ripple: check ledger exist error, retry after 2s") + log.DetailError(err) + time.Sleep(time.Second * 2) + goto START + } + + if s.cfg.DatabaseEnable && !exist { + s.databaseWorkerChan <- data + s.cfg.Consumer.MarkOffset(message) + } + } + } +} + +func (s *XRPSplitter) Stop() { + s.cronWorker.Stop() +} + +func (s *XRPSplitter) IsExistingLedger(cur *XRPLedgerData) (bool, error) { + db := service.NewDatabase(s.cfg.Engine) + ledgers := make([]*model.Ledger, 0) + err := db.Where("ledger_index = ?", cur.Ledger.LedgerIndex).Find(&ledgers) + if err != nil { + log.DetailError(err) + return false, err + } + + if len(ledgers) == 0 { + //log.Warn("splitter ripple: can not find current ledger %d", cur.Ledger.LedgerIndex) + return false, nil + } + + return true, nil +} + +func (s *XRPSplitter) LedgerInsert(data *XRPLedgerData) error { + startTime := time.Now() + tx := service.NewTransaction(s.cfg.Engine) + defer tx.Close() + + err := tx.Begin() + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + + var affected int64 + ledgers := make([]*model.Ledger, 0) + ledgers = append(ledgers, data.Ledger) + affected, err = tx.BatchInsert(ledgers) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: ledger write %d rows", affected) + + if len(data.AccountSets) != 0 { + affected, err = tx.BatchInsert(data.AccountSets) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction AccountSets write %d rows", affected) + } + + if len(data.DepositPreauths) != 0 { + affected, err = tx.BatchInsert(data.DepositPreauths) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction DepositPreauths write %d rows", affected) + } + + if len(data.EscrowCancels) != 0 { + affected, err = tx.BatchInsert(data.EscrowCancels) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction EscrowCancels write %d rows", affected) + } + + if len(data.EscrowCreates) != 0 { + affected, err = tx.BatchInsert(data.EscrowCreates) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction EscrowCreates write %d rows", affected) + } + + if len(data.EscrowFinishes) != 0 { + affected, err = tx.BatchInsert(data.EscrowFinishes) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction EscrowFinishes write %d rows", affected) + } + + if len(data.OfferCancels) != 0 { + affected, err = tx.BatchInsert(data.OfferCancels) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction OfferCancels write %d rows", affected) + } + + if len(data.OfferCreates) != 0 { + affected, err = tx.BatchInsert(data.OfferCreates) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction OfferCreates write %d rows", affected) + } + + if len(data.Payments) != 0 { + affected, err = tx.BatchInsert(data.Payments) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction Payments write %d rows", affected) + } + + if len(data.PaymentChannelClaims) != 0 { + affected, err = tx.BatchInsert(data.PaymentChannelClaims) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction PaymentChannelClaims write %d rows", affected) + } + + if len(data.PaymentChannelCreates) != 0 { + affected, err = tx.BatchInsert(data.PaymentChannelCreates) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction PaymentChannelCreates write %d rows", affected) + } + + if len(data.PaymentChannelFunds) != 0 { + affected, err = tx.BatchInsert(data.PaymentChannelFunds) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction PaymentChannelFunds write %d rows", affected) + } + + if len(data.SetRegularKeys) != 0 { + affected, err = tx.BatchInsert(data.SetRegularKeys) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction SetRegularKeys write %d rows", affected) + } + + if len(data.SignerListSets) != 0 { + affected, err = tx.BatchInsert(data.SignerListSets) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction SignerListSets write %d rows", affected) + } + + if len(data.TrustSets) != 0 { + affected, err = tx.BatchInsert(data.TrustSets) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + log.Debug("splitter ripple: transaction TrustSets write %d rows", affected) + } + + err = tx.Commit() + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + return err + } + tx.Close() + stats.Add(MetricDatabaseCommit, 1) + elapsedTime := time.Now().Sub(startTime) + s.latestSaveDataTimestamp = time.Now() + log.Debug("splitter ripple: ledger data %d write done elapsed: %s", data.Ledger.LedgerIndex, elapsedTime.String()) + return nil +} + +func (s *XRPSplitter) revertLedger(ledgerIndex int64) error { + tx := service.NewTransaction(s.cfg.Engine) + defer tx.Close() + + err := tx.Begin() + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + sql := fmt.Sprintf("delete from ripple_ledger where ledger_index = %d", ledgerIndex) + _, err = tx.Exec(sql) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + sql = fmt.Sprintf("delete from ripple_ac where ledger_index = %d", ledgerIndex) + _, err = tx.Exec(sql) + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + stats.Add(MetricDatabaseRollback, 1) + return err + } + err = tx.Commit() + if err != nil { + _ = tx.Rollback() + log.DetailError(err) + return err + } + tx.Close() + return nil +} + +func (s *XRPSplitter) CheckMissedLedger() ([]int64, error) { + missedList := make([]int64, 0) + + db := service.NewDatabase(s.cfg.Engine) + sql := fmt.Sprintf("SELECT ledger_index FROM ripple_ledger ORDER BY ledger_index ASC") + data, err := db.QueryString(sql) + if err != nil { + return nil, err + } + + ledgerList := make([]*model.Ledger, 0) + for _, v := range data { + ledger := new(model.Ledger) + tmp := v["ledger_index"] + ledgerIndex, err := strconv.ParseInt(tmp, 10, 64) + if err != nil { + return nil, err + } + ledger.LedgerIndex = ledgerIndex + ledgerList = append(ledgerList, ledger) + } + + if len(ledgerList) > 0 { + checkList := make(map[int64]bool, 0) + for _, b := range ledgerList { + checkList[b.LedgerIndex] = true + } + + for i := int64(32570); i <= ledgerList[len(ledgerList)-1].LedgerIndex; i++ { + if _, ok := checkList[i]; !ok { + missedList = append(missedList, i) + } + } + } + + return missedList, nil +} + +func (s *XRPSplitter) jsonSchemaValid(data string) (bool, error) { + startTime := time.Now() + dataLoader := gojsonschema.NewStringLoader(data) + result, err := gojsonschema.Validate(s.jsonSchemaLoader, dataLoader) + if err != nil { + log.Error("splitter ripple: json schema validation error") + log.DetailError(err) + return false, err + } + if !result.Valid() { + for _, err := range result.Errors() { + log.Error("splitter ripple: data invalid %s", strings.ToLower(err.String())) + return false, nil + } + stats.Add(MetricVaildationError, 1) + } else { + stats.Add(MetricVaildationSuccess, 1) + } + elaspedTime := time.Now().Sub(startTime) + log.Debug("splitter ripple: json schema validation elapsed %s", elaspedTime) + return true, nil +} + +func (s *XRPSplitter) databaseWorker(i int) { + log.Info("splitter ripple: starting database worker %d", i) + for { + select { + case data := <-s.databaseWorkerChan: + err := s.LedgerInsert(data) + if err != nil { + log.Error("splitter ripple: ledger data %d insert error, retry after 5s", data.Ledger.LedgerIndex) + log.DetailError(err) + } + case stop := <-s.databaseWorkerStopChan: + if stop { + msg := fmt.Sprintf("splitter ripple: database worker %d stopped", i) + log.Info("splitter ripple: ", msg) + return + } + } + } +} diff --git a/splitter/xrp/worker.go b/splitter/ripple/worker.go similarity index 80% rename from splitter/xrp/worker.go rename to splitter/ripple/worker.go index 695a125..3f1d2c2 100644 --- a/splitter/xrp/worker.go +++ b/splitter/ripple/worker.go @@ -1,4 +1,4 @@ -package xrp +package ripple import ( "github.com/jdcloud-bds/bds/common/cron" @@ -24,7 +24,7 @@ func (w *CronWorker) Prepare() error { } for _, job := range jobList { - log.Debug("worker xrp: prepare %s", job.Name()) + log.Debug("worker ripple: prepare %s", job.Name()) err := job.run() if err != nil { return err @@ -47,18 +47,18 @@ func (w *CronWorker) Start() error { return err } stats.Add(MetricCronWorkerJob, 1) - log.Info("worker xrp: add job %s", job.Name()) + log.Info("worker ripple: add job %s", job.Name()) } - expr = config.SplitterConfig.CronXRPSetting.GetBatchBlockExpr + expr = config.SplitterConfig.CronXRPSetting.GetBatchLedgerExpr if len(expr) != 0 { - job = newGetBatchBlockJob(w.splitter) + job = newGetBatchLedgerJob(w.splitter) err = w.crontab.AddJob(job.Name(), expr, job) if err != nil { return err } stats.Add(MetricCronWorkerJob, 1) - log.Info("worker xrp: add job %s", job.Name()) + log.Info("worker ripple: add job %s", job.Name()) } w.crontab.Start() diff --git a/splitter/splitter.go b/splitter/splitter.go index 8e425fe..947afa1 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -18,7 +18,7 @@ import ( "github.com/jdcloud-bds/bds/splitter/ltc" "github.com/jdcloud-bds/bds/splitter/tron" "github.com/jdcloud-bds/bds/splitter/xlm" - "github.com/jdcloud-bds/bds/splitter/xrp" + "github.com/jdcloud-bds/bds/splitter/ripple" ) type Splitter struct { @@ -756,7 +756,7 @@ func (p *Splitter) Run() { //Loading configuration file information of XRP node if config.SplitterConfig.XRPSetting.Enable { - xrpConfig := &xrp.SplitterConfig{ + xrpConfig := &ripple.SplitterConfig{ p.xrpEngine, p.xrpConsumer, config.SplitterConfig.KafkaXRPSetting.Topic, @@ -770,7 +770,7 @@ func (p *Splitter) Run() { config.SplitterConfig.XRPSetting.DatabaseWorkerNumber, config.SplitterConfig.XRPSetting.DatabaseWorkerBuffer, } - xrpSplitter, err := xrp.NewSplitter(xrpConfig) + xrpSplitter, err := ripple.NewSplitter(xrpConfig) if err != nil { panic(err) } diff --git a/splitter/xrp/handler_rpc.go b/splitter/xrp/handler_rpc.go deleted file mode 100644 index bd17a80..0000000 --- a/splitter/xrp/handler_rpc.go +++ /dev/null @@ -1,126 +0,0 @@ -package xrp - -import ( - "errors" - "fmt" - "github.com/jdcloud-bds/bds/common/json" - "github.com/jdcloud-bds/bds/common/jsonrpc" - "github.com/jdcloud-bds/bds/common/log" - "github.com/jdcloud-bds/bds/common/math" - "math/big" - "strconv" - "strings" -) - -type rpcHandler struct { - client *jsonrpc.Client -} - -func newRPCHandler(c *jsonrpc.Client) (*rpcHandler, error) { - h := new(rpcHandler) - h.client = c - return h, nil -} - -func (h *rpcHandler) GetBlockNumber() (int64, error) { - defer stats.Add(MetricRPCCall, 1) - data, err := h.client.Call("xrp_blockNumber") - if err != nil { - return 0, err - } - v := json.GetBytes(data, "result").String() - if len(v) == 0 { - return 0, errors.New("cannot get block number") - } - number, err := strconv.ParseInt(v, 0, 64) - if err != nil { - return 0, err - } - - return number, nil -} - -func (h *rpcHandler) GetBalance(address string, height int64) (*big.Int, error) { - defer stats.Add(MetricRPCCall, 1) - if !strings.HasPrefix(address, "0x") { - address = fmt.Sprintf("0x%s", address) - } - - var hexNumber string - if height == int64(0) { - hexNumber = "latest" - } else { - hexNumber = fmt.Sprintf("%#x", height) - } - data, err := h.client.Call("xrp_getBalance", address, ""+hexNumber+"") - if err != nil { - log.DetailError(err) - return nil, err - } - tmp := json.GetBytes(data, "result").String() - value, err := math.ParseInt256(tmp) - if err != nil { - log.DetailError(err) - return nil, err - } - return value, nil -} - -func (h *rpcHandler) SendBlock(number int64) error { - defer stats.Add(MetricRPCCall, 1) - hexNumber := fmt.Sprintf("%#x", number) - _, err := h.client.Call("xrp_sendBlockByNumber", hexNumber, true) - if err != nil { - return err - } - return nil -} - -type CompleteLedgers struct { - startLedger int64 - endLedger int64 -} - -func (h *rpcHandler) GetCompleteLedgers() (map[int]*CompleteLedgers, error) { - totalCompleteLedgers := make(map[int]*CompleteLedgers, 0) - - res, err := h.client.CallXRP("server_info") - if err != nil { - return nil, err - } - data := string(res) - cl_str := json.Get(data, "result.info.complete_ledgers").String() - //cl_str demo:"47025320,47025422-47025425,47025527-47025528,47025629-47025661,47025763,47025864-47025877" - log.Info("splitter xrp: get completed ledgers: %s\n\n", cl_str) - if strings.ToLower(cl_str) == "empty" { - return nil, nil - } - - cl_arr := strings.Split(cl_str, ",") - for i, v := range cl_arr { - cl := new(CompleteLedgers) - ind := strings.Index(v, "-") - if ind > 0 { - cl.startLedger, _ = strconv.ParseInt(v[:ind], 10, 64) - cl.endLedger, _ = strconv.ParseInt(v[ind+1:], 10, 64) - } else { - cl.startLedger, _ = strconv.ParseInt(v, 10, 64) - cl.endLedger, _ = strconv.ParseInt(v, 10, 64) - } - totalCompleteLedgers[i] = cl - } - return totalCompleteLedgers, nil -} -func (h *rpcHandler) SendBatchBlock(startNumber, endNumber int64) error { - defer stats.Add(MetricRPCCall, 1) - params := make(map[string]interface{}, 0) - params["start_ledger_index"] = startNumber - params["end_ledger_index"] = endNumber - - res, err := h.client.CallXRP("send_batch_ledger", params) - log.Info("splitter xrp: send batch ledger res: %s", string(res)) - if err != nil { - return err - } - return nil -} diff --git a/splitter/xrp/job.go b/splitter/xrp/job.go deleted file mode 100644 index 56d52a2..0000000 --- a/splitter/xrp/job.go +++ /dev/null @@ -1,250 +0,0 @@ -package xrp - -import ( - "fmt" - "github.com/jdcloud-bds/bds/common/log" - "github.com/jdcloud-bds/bds/service" - model "github.com/jdcloud-bds/bds/service/model/xrp" - "strconv" - "time" -) - -type WorkerJob interface { - Run() - Name() string - run() error -} - -type updateMetaDataJob struct { - splitter *XRPSplitter - name string -} - -func newUpdateMetaDataJob(splitter *XRPSplitter) *updateMetaDataJob { - j := new(updateMetaDataJob) - j.splitter = splitter - j.name = "update meta data" - return j -} - -func (j *updateMetaDataJob) Run() { - _ = j.run() -} - -func (j *updateMetaDataJob) Name() string { - return j.name -} - -func (j *updateMetaDataJob) run() error { - startTime := time.Now() - db := service.NewDatabase(j.splitter.cfg.Engine) - metas := make([]*model.Meta, 0) - err := db.Find(&metas) - if err != nil { - log.Error("worker xrp: job '%s' get table list from meta error", j.name) - return err - } - - for _, meta := range metas { - cond := new(model.Meta) - cond.Name = meta.Name - data := new(model.Meta) - - var countSql string - if j.splitter.cfg.Engine.DriverName() == "mssql" { - countSql = fmt.Sprintf("SELECT b.rows AS count FROM sysobjects a INNER JOIN sysindexes b ON a.id = b.id WHERE a.type = 'u' AND b.indid in (0,1) AND a.name='%s'", meta.Name) - } else { - countSql = fmt.Sprintf("SELECT COUNT(1) FROM `%s`", meta.Name) - } - result, err := db.QueryString(countSql) - if err != nil { - log.Error("worker xrp: job %s get table %s count from meta error", j.name, meta.Name) - log.DetailError(err) - continue - } - if len(result) == 0 { - continue - } - count, _ := strconv.ParseInt(result[0]["count"], 10, 64) - - sql := db.Table(meta.Name).Cols("id").Desc("id").Limit(1, 0) - result, err = sql.QueryString() - if err != nil { - log.Error("worker xrp: job '%s' get table %s id from meta error", j.name, meta.Name) - log.DetailError(err) - continue - } - for _, v := range result { - id, _ := strconv.ParseInt(v["id"], 10, 64) - data.LastID = id - data.Count = count - _, err = db.Update(data, cond) - if err != nil { - log.Error("worker xrp: job '%s' update table %s meta error", j.name, meta.Name) - log.DetailError(err) - continue - } - - } - } - stats.Add(MetricCronWorkerJobUpdateMetaData, 1) - elaspedTime := time.Now().Sub(startTime) - log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String()) - return nil -} - -type getBatchBlockJob struct { - splitter *XRPSplitter - name string -} - -func newGetBatchBlockJob(splitter *XRPSplitter) *getBatchBlockJob { - j := new(getBatchBlockJob) - j.splitter = splitter - j.name = "'get batch block'" - return j -} - -func (j *getBatchBlockJob) Run() { - _ = j.run() -} - -func (j *getBatchBlockJob) Name() string { - return j.name -} - -func (j *getBatchBlockJob) run0() error { - startTime := time.Now() - db := service.NewDatabase(j.splitter.cfg.Engine) - totalCompleteLedgers, err := j.splitter.remoteHandler.GetCompleteLedgers() - if err != nil { - log.Error("worker xrp: job %s get closed ledgers error.", j.name) - log.Error("worker xrp: %s", err.Error()) - } - batchSize := int64(1000) - - ledgerList := make(map[int64]bool, 0) - sql := "select ledger_index from xrp_block order by ledger_index asc" - result, err := db.QueryString(sql) - if err != nil { - log.Error("worker xrp: get ledger_index from database error") - log.DetailError(err) - return err - } - for _, v := range result { - ledger_index, _ := strconv.ParseInt(v["ledger_index"], 10, 64) - ledgerList[ledger_index] = true - } - for _, cl := range totalCompleteLedgers { - missedLedger := make(map[int64]bool, 0) - i := int64(0) - sendStart := cl.endLedger - sendEnd := cl.startLedger - for k := cl.startLedger; k <= cl.endLedger; k++ { - if _, ok := ledgerList[k]; !ok { - missedLedger[k] = true - if k < sendStart { - sendStart = k - } - if k > sendEnd { - sendEnd = k - } - } - i++ - if i >= batchSize { - if sendEnd >= sendStart && len(missedLedger) > 0 { - log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd) - err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd) - if err != nil { - log.Error("splitter xrp: send batch block error: %s", err.Error()) - return err - } - } - missedLedger = make(map[int64]bool, 0) - i = 0 - sendStart = cl.endLedger - sendEnd = k + 1 - - } - } - if len(missedLedger) > 0 { - if sendEnd >= sendStart { - log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd) - err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd) - if err != nil { - log.Error("splitter xrp: send batch block error: %s", err.Error()) - return err - } - } - } - } - - elaspedTime := time.Now().Sub(startTime) - log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String()) - return nil -} - -func (j *getBatchBlockJob) run() error { - startTime := time.Now() - db := service.NewDatabase(j.splitter.cfg.Engine) - totalCompleteLedgers, err := j.splitter.remoteHandler.GetCompleteLedgers() - if err != nil { - log.Error("worker xrp: job %s get closed ledgers error\n\n", j.name) - log.Error("worker xrp: %s", err.Error()) - } - batchSize := int64(1000) - - ledgerList := make(map[int64]bool, 0) - sql := "select ledger_index from xrp_block order by ledger_index asc" - result, err := db.QueryString(sql) - if err != nil { - log.Error("worker xrp: get ledger_index from database error") - log.DetailError(err) - return err - } - for _, v := range result { - ledger_index, _ := strconv.ParseInt(v["ledger_index"], 10, 64) - ledgerList[ledger_index] = true - } - for _, cl := range totalCompleteLedgers { - missedLedger := make(map[int64]bool, 0) - i := int64(0) - sendStart := int64(0) - sendEnd := int64(0) - countStart := false - for k := cl.startLedger; k <= cl.endLedger; k++ { - if _, ok := ledgerList[k]; !ok { - if !countStart { - sendStart = k - sendEnd = k - countStart = true - } else { - sendEnd = k - } - missedLedger[k] = true - i++ - } else if countStart { - break - } - - if i >= batchSize { - break - } - } - if len(missedLedger) > 0 { - if sendEnd >= sendStart { - log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd) - err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd) - if err != nil { - log.Error("splitter xrp: send batch block error: %s", err.Error()) - return err - } - } - } - - } - - elaspedTime := time.Now().Sub(startTime) - log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String()) - return nil -} diff --git a/splitter/xrp/meta.go b/splitter/xrp/meta.go deleted file mode 100644 index 2eabcc8..0000000 --- a/splitter/xrp/meta.go +++ /dev/null @@ -1,49 +0,0 @@ -package xrp - -import ( - "github.com/jdcloud-bds/bds/common/cuckoofilter" - "github.com/jdcloud-bds/bds/common/metric" - model "github.com/jdcloud-bds/bds/service/model/xrp" - "math/big" - "sync" -) - -const ( - MetricReceiveMessages = "receive_messages" - MetricParseDataError = "parse_data_error" - MetricVaildationSuccess = "validation_success" - MetricVaildationError = "validation_error" - MetricDatabaseRollback = "database_rollback" - MetricDatabaseCommit = "database_commit" - MetricCronWorkerJob = "cron_worker_job" - MetricCronWorkerJobUpdateMetaData = "cron_worker_job_update_meta_data" - MetricCronWorkerJobGetBatchBlock = "cron_worker_job_get_batch_block" - MetricCronWorkerJobRefreshContractAddresses = "cron_worker_job_refresh_contract_addresses" - MetricCronWorkerJobRefreshPoolName = "cron_worker_job_refresh_pool_name" - MetricRPCCall = "rpc_call" - MetricRevertBlock = "revert_block" - - AccountTypeNormal = 0 - AccountTypeContract = 1 - AccountTypeMiner = 2 - - TransactionTypeNormal = 0 - TransactionTypeContract = 1 -) - -var ( - stats = metric.NewMap("xrp") - contractAddressFilter = cuckoofilter.New() - poolNameMap = new(sync.Map) - maxBigNumber, _ = new(big.Int).SetString("100000000000000000000000000000000000000", 10) - defaultBigNumber, _ = new(big.Int).SetString("-1", 10) -) - -type XRPBlockData struct { - Block *model.Block - Transactions []*model.Transaction - Accounts []*model.Account - AffectedNodes []*model.AffectedNodes - Paths []*model.Path - Amounts []*model.Amount -} diff --git a/splitter/xrp/method.go b/splitter/xrp/method.go deleted file mode 100644 index d1589f9..0000000 --- a/splitter/xrp/method.go +++ /dev/null @@ -1,266 +0,0 @@ -package xrp - -import ( - "fmt" - "github.com/jdcloud-bds/bds/common/json" - "github.com/jdcloud-bds/bds/common/log" - "github.com/jdcloud-bds/bds/common/math" - "github.com/jdcloud-bds/bds/service" - model "github.com/jdcloud-bds/bds/service/model/xrp" - "strings" - "time" -) - -func FormatAmount(transaction *model.Transaction, input string) *model.Amount { - amount := new(model.Amount) - amount.LedgerIndex = transaction.LedgerIndex - amount.CloseTime = transaction.CloseTime - amount.ParentHash = transaction.Hash - amount.Currency = json.Get(input, "currency").String() - amount.Issuer = json.Get(input, "issuer").String() - amount.Value = json.Get(input, "value").String() - return amount -} -func ParseBlock(data string) (*XRPBlockData, error) { - startTime := time.Now() - var err error - - b := new(XRPBlockData) - b.Block = new(model.Block) - b.Transactions = make([]*model.Transaction, 0) - b.AffectedNodes = make([]*model.AffectedNodes, 0) - b.Paths = make([]*model.Path, 0) - b.Amounts = make([]*model.Amount, 0) - - b.Block.Accepted = int(json.Get(data, "accepted").Int()) - b.Block.AccountHash = json.Get(data, "account_hash").String() - b.Block.CloseFlags = int(json.Get(data, "close_flags").Int()) - b.Block.CloseTime = json.Get(data, "close_time").Int() - b.Block.CloseTimeHuman = json.Get(data, "close_time_human").String() - b.Block.CloseTimeResolution = int(json.Get(data, "close_time_resolution").Int()) - b.Block.Closed = int(json.Get(data, "closed").Int()) - b.Block.Hash = json.Get(data, "hash").String() - b.Block.LedgerHash = json.Get(data, "ledger_hash").String() - b.Block.LedgerIndex = json.Get(data, "ledger_index").Int() - b.Block.ParentCloseTime = json.Get(data, "parent_close_time").Int() - b.Block.ParentHash = json.Get(data, "parent_hash").String() - b.Block.SeqNum = json.Get(data, "seqNum").Int() - totalCoins := json.Get(data, "total_coins").String() - b.Block.TotalCoins, err = parseBigInt(totalCoins) - if err != nil { - log.Error("splitter xrp: block %d TotalCoins '%s' parse error", b.Block.LedgerIndex, totalCoins) - return nil, err - } - b.Block.TransactionHash = json.Get(data, "transaction_hash").String() - - txList := json.Get(data, "transactions").Array() - for _, txItem := range txList { - transaction := new(model.Transaction) - transaction.Account = json.Get(txItem.String(), "Account").String() - transaction.TransactionType = json.Get(txItem.String(), "TransactionType").String() - transaction.Fee = json.Get(txItem.String(), "Fee").Int() - transaction.Sequence = json.Get(txItem.String(), "Sequence").Int() - transaction.AccountTxnID = json.Get(txItem.String(), "AccountTxnID").String() - transaction.Flags = json.Get(txItem.String(), "Flags").Int() - transaction.LastLedgerSequence = json.Get(txItem.String(), "LastLedgerSequence").Int() - transaction.Memos = json.Get(txItem.String(), "Memos").String() - transaction.Signers = json.Get(txItem.String(), "Signers").String() - transaction.SourceTag = json.Get(txItem.String(), "SourceTag").Int() - transaction.SigningPubKey = json.Get(txItem.String(), "SigningPubKey").String() - transaction.TxnSignature = json.Get(txItem.String(), "TxnSignature").String() - - transaction.Hash = json.Get(txItem.String(), "hash").String() - transaction.LedgerIndex = b.Block.LedgerIndex - transaction.TransactionResult = json.Get(txItem.String(), "metaData.TransactionResult").String() - transaction.TransactionIndex = int(json.Get(txItem.String(), "metaData.TransactionIndex").Int()) - transaction.Validated = int(json.Get(txItem.String(), "validated").Int()) - transaction.Date = json.Get(txItem.String(), "date").Int() - - transaction.CloseTime = b.Block.CloseTime - tempAmount := json.Get(txItem.String(), "Amount").String() - if strings.Contains(tempAmount, "{") { - amount := FormatAmount(transaction, tempAmount) - amount.AmountType = 1 - b.Amounts = append(b.Amounts, amount) - transaction.Amount = -1 - } else { - transaction.Amount = json.Get(txItem.String(), "Amount").Int() - } - tempSendMax := json.Get(txItem.String(), "SendMax").String() - if strings.Contains(tempSendMax, "{") { - amount := FormatAmount(transaction, tempSendMax) - amount.AmountType = 2 - b.Amounts = append(b.Amounts, amount) - transaction.SendMax = -1 - } else { - transaction.SendMax = json.Get(txItem.String(), "SendMax").Int() - } - - tempDeliverMin := json.Get(txItem.String(), "DeliverMin").String() - if strings.Contains(tempDeliverMin, "{") { - amount := FormatAmount(transaction, tempDeliverMin) - amount.AmountType = 2 - b.Amounts = append(b.Amounts, amount) - transaction.DeliverMin = -1 - } else { - transaction.DeliverMin = json.Get(txItem.String(), "DeliverMin").Int() - } - - transaction.Destination = json.Get(txItem.String(), "Destination").String() - transaction.DestinationTag = json.Get(txItem.String(), "DestinationTag").Int() - transaction.InvoiceID = json.Get(txItem.String(), "InvoiceID").String() - transaction.Expiration = json.Get(txItem.String(), "Expiration").Int() - transaction.OfferSequence = int(json.Get(txItem.String(), "OfferSequence").Int()) - tempTakerGets := json.Get(txItem.String(), "TakerGets").String() - if strings.Contains(tempTakerGets, "{") { - amount := FormatAmount(transaction, tempTakerGets) - amount.AmountType = 4 - b.Amounts = append(b.Amounts, amount) - transaction.TakerGets = -1 - } else { - transaction.TakerGets = json.Get(txItem.String(), "TakerGets").Int() - } - tempTakerPays := json.Get(txItem.String(), "TakerPays").String() - if strings.Contains(tempTakerPays, "{") { - amount := FormatAmount(transaction, tempTakerPays) - amount.AmountType = 5 - b.Amounts = append(b.Amounts, amount) - transaction.TakerPays = -1 - } else { - transaction.TakerPays = json.Get(txItem.String(), "TakerPays").Int() - } - tempLimitAmount := json.Get(txItem.String(), "LimitAmount").String() - if strings.Contains(tempLimitAmount, "{") { - amount := FormatAmount(transaction, tempLimitAmount) - amount.AmountType = 6 - b.Amounts = append(b.Amounts, amount) - transaction.LimitAmount = -1 - } else { - transaction.LimitAmount = json.Get(txItem.String(), "LimitAmount").Int() - } - - transaction.QualityIn = json.Get(txItem.String(), "QualityIn").Int() - transaction.QualityOut = json.Get(txItem.String(), "QualityOut").Int() - transaction.ClearFlag = int(json.Get(txItem.String(), "ClearFlag").Int()) - transaction.Domain = json.Get(txItem.String(), "Domain").String() - transaction.EmailHash = json.Get(txItem.String(), "EmailHash").String() - transaction.MessageKey = json.Get(txItem.String(), "MessageKey").String() - transaction.SetFlag = int(json.Get(txItem.String(), "SetFlag").Int()) - transaction.TransferRate = int(json.Get(txItem.String(), "TransferRate").Int()) - - transaction.TickSize = int(json.Get(txItem.String(), "TickSize").Int()) - transaction.RegularKey = json.Get(txItem.String(), "RegularKey").String() - transaction.SignerQuorum = int(json.Get(txItem.String(), "SignerQuorum").Int()) - transaction.CancelAfter = json.Get(txItem.String(), "CancelAfter").Int() - transaction.FinishAfter = json.Get(txItem.String(), "FinishAfter").Int() - transaction.Condition = json.Get(txItem.String(), "Condition").String() - transaction.Owner = json.Get(txItem.String(), "Owner").String() - - transaction.Fulfillment = json.Get(txItem.String(), "Fulfillment").String() - transaction.SettleDelay = json.Get(txItem.String(), "SettleDelay").Int() - transaction.PublicKey = json.Get(txItem.String(), "PublicKey").String() - transaction.Channel = json.Get(txItem.String(), "Channel").String() - transaction.Balance = json.Get(txItem.String(), "Balance").Int() - transaction.Authorize = json.Get(txItem.String(), "Authorize").String() - transaction.UnAuthorize = json.Get(txItem.String(), "UnAuthorize").String() - - //paths - txPaths := json.Get(txItem.String(), "Paths").Array() - p := int64(0) - for _, pathItem := range txPaths { - inPaths := pathItem.Array() - i := int64(0) - for _, pathAgent := range inPaths { - path := new(model.Path) - path.CloseTime = b.Block.CloseTime - path.LedgerIndex = b.Block.LedgerIndex - path.Type = json.Get(pathAgent.String(), "type").Int() - path.Currency = json.Get(pathAgent.String(), "currency").String() - path.Issuer = json.Get(pathAgent.String(), "issuer").String() - path.ParentHash = transaction.Hash - path.InTxIndex = p - path.InPathIndex = i - - b.Paths = append(b.Paths, path) - i++ - } - p++ - } - //affectedNodes - txNodes := json.Get(txItem.String(), "metaData.AffectedNodes").Array() - for _, nodeItem := range txNodes { - node := new(model.AffectedNodes) - node.CloseTime = b.Block.CloseTime - node.LedgerIndex = b.Block.LedgerIndex - node.ParentHash = transaction.Hash - for key, value := range nodeItem.Map() { - node.NodeType = key - node.LedgerEntryType = json.Get(value.String(), "LedgerEntryType").String() - node.NodeLedgerIndex = json.Get(value.String(), "LedgerIndex").String() - node.PreviousTxnID = json.Get(value.String(), "PreviousTxnID").String() - node.PreviousTxnLgrSeq = json.Get(value.String(), "PreviousTxnLgrSeq").Int() - node.FullJsonStr = value.String() - } - - b.AffectedNodes = append(b.AffectedNodes, node) - } - transaction.PathesLen = len(txPaths) - transaction.AffectedNodesLen = len(txNodes) - - b.Transactions = append(b.Transactions, transaction) - } - - b.Block.TransactionLength = len(b.Transactions) - - elaspedTime := time.Now().Sub(startTime) - log.Debug("splitter xrp: parse block %d, txs %d, elasped time %s", b.Block.LedgerIndex, b.Block.TransactionLength, elaspedTime.String()) - - return b, nil -} - -func revertMiner(height int64, tx *service.Transaction) error { - startTime := time.Now() - index := "revert_miner" - sql := fmt.Sprintf("UPDATE a SET a.miner_count = a.miner_count - 1 FROM xrp_account a"+ - " JOIN (SELECT miner FROM xrp_block WHERE height = '%d') b"+ - " ON a.address = b.miner ", height) - affected1, err := tx.Execute(sql) - if err != nil { - log.DetailError(err) - return err - } - sql = fmt.Sprintf("UPDATE a SET a.miner_uncle_count = a.miner_uncle_count - 1 FROM xrp_account a"+ - " JOIN (SELECT miner FROM xrp_block WHERE height = '%d') b"+ - " ON a.address = b.miner ", height) - affected2, err := tx.Execute(sql) - if err != nil { - log.DetailError(err) - return err - } - elaspedTime := time.Now().Sub(startTime) - log.Debug("splitter xrp index: %s affected %d %d elasped %s", index, affected1, affected2, elaspedTime.String()) - return nil -} - -func removeHexPrefixAndToLower(s string) string { - return strings.ToLower(strings.TrimPrefix(s, "0x")) -} - -func parseBigInt(s string) (math.HexOrDecimal256, error) { - var n math.HexOrDecimal256 - if s == "0x" { - s = "0x0" - } - - v, ok := math.ParseBig256(s) - if !ok { - n = math.HexOrDecimal256(*defaultBigNumber) - } else { - if v.Cmp(maxBigNumber) >= 0 { - n = math.HexOrDecimal256(*defaultBigNumber) - } else { - n = math.HexOrDecimal256(*v) - } - } - return n, nil -} diff --git a/splitter/xrp/splitter.go b/splitter/xrp/splitter.go deleted file mode 100644 index 4f9e44f..0000000 --- a/splitter/xrp/splitter.go +++ /dev/null @@ -1,404 +0,0 @@ -package xrp - -import ( - "fmt" - "github.com/go-xorm/xorm" - "github.com/jdcloud-bds/bds/common/httputils" - "github.com/jdcloud-bds/bds/common/jsonrpc" - "github.com/jdcloud-bds/bds/common/kafka" - "github.com/jdcloud-bds/bds/common/log" - "github.com/jdcloud-bds/bds/service" - model "github.com/jdcloud-bds/bds/service/model/xrp" - "github.com/xeipuuv/gojsonschema" - "strconv" - "strings" - "time" -) - -type SplitterConfig struct { - Engine *xorm.Engine - Consumer *kafka.ConsumerGroup - Topic string - DatabaseEnable bool - MaxBatchBlock int - Endpoint string - User string - Password string - JSONSchemaFile string - JSONSchemaValidationEnable bool - DatabaseWorkerNumber int - DatabaseWorkerBuffer int -} - -type XRPSplitter struct { - cfg *SplitterConfig - remoteHandler *rpcHandler - cronWorker *CronWorker - jsonSchemaLoader gojsonschema.JSONLoader - missedBlockList map[int64]bool - latestSaveDataTimestamp time.Time - latestReceiveMessageTimestamp time.Time - databaseWorkerChan chan *XRPBlockData - databaseWorkerStopChan chan bool -} - -func NewSplitter(cfg *SplitterConfig) (*XRPSplitter, error) { - var err error - s := new(XRPSplitter) - s.cfg = cfg - s.databaseWorkerChan = make(chan *XRPBlockData, cfg.DatabaseWorkerBuffer) - s.databaseWorkerStopChan = make(chan bool, 0) - s.missedBlockList = make(map[int64]bool, 0) - httpClient := httputils.NewRestClientWithBasicAuth(s.cfg.User, s.cfg.Password) - s.remoteHandler, err = newRPCHandler(jsonrpc.New(httpClient, s.cfg.Endpoint)) - if err != nil { - log.DetailError(err) - } - - if s.cfg.JSONSchemaValidationEnable { - f := fmt.Sprintf("file://%s", s.cfg.JSONSchemaFile) - s.jsonSchemaLoader = gojsonschema.NewReferenceLoader(f) - } - - s.cronWorker = NewCronWorker(s) - err = s.cronWorker.Prepare() - if err != nil { - log.DetailError(err) - return nil, err - } - - return s, nil -} - -func (s *XRPSplitter) Start() { - err := s.cronWorker.Start() - if err != nil { - log.Error("splitter xrp: cron worker start error") - log.DetailError(err) - return - } - - err = s.cfg.Consumer.Start(s.cfg.Topic) - if err != nil { - log.Error("splitter xrp: consumer start error") - log.DetailError(err) - return - } - - for i := 0; i < s.cfg.DatabaseWorkerNumber; i++ { - go s.databaseWorker(i) - } - - log.Debug("splitter xrp: consumer start topic %s", s.cfg.Topic) - log.Debug("splitter xrp: database enable is %v", s.cfg.DatabaseEnable) - - for { - select { - case message := <-s.cfg.Consumer.MessageChannel(): - stats.Add(MetricReceiveMessages, 1) - s.latestReceiveMessageTimestamp = time.Now() - - START: - if s.cfg.JSONSchemaValidationEnable { - ok, err := s.jsonSchemaValid(string(message.Data)) - if err != nil { - log.Error("splitter xrp: json schema valid error") - } - if !ok { - log.Warn("splitter xrp: json schema valid failed") - } - } - - data, err := ParseBlock(string(message.Data)) - if err != nil { - stats.Add(MetricParseDataError, 1) - log.Error("splitter xrp: block parse error, retry after 5s") - log.DetailError(err) - time.Sleep(time.Second * 5) - goto START - } - - if s.cfg.DatabaseEnable { - s.databaseWorkerChan <- data - s.cfg.Consumer.MarkOffset(message) - } - } - } -} - -func (s *XRPSplitter) Stop() { - s.cronWorker.Stop() -} - -func (s *XRPSplitter) CheckBlock(curBlock *XRPBlockData) (bool, int64) { - db := service.NewDatabase(s.cfg.Engine) - height := int64(-1) - prevBlock := make([]*model.Block, 0) - err := db.Where("height = ?", curBlock.Block.LedgerIndex-1).Find(&prevBlock) - if err != nil { - log.DetailError(err) - return false, height - } - - if len(prevBlock) != 1 { - log.Warn("splitter xrp: can not find previous block %d", curBlock.Block.LedgerIndex-1) - blocks := make([]*model.Block, 0) - err = db.Desc("height").Limit(1).Find(&blocks) - if err != nil { - log.DetailError(err) - } else { - if len(blocks) > 0 { - height = blocks[0].LedgerIndex + 1 - } else { - log.Warn("splitter xrp: database empty") - height = 0 - } - } - return false, height - } - - if prevBlock[0].Hash != curBlock.Block.ParentHash { - log.Warn("splitter xrp: block %d is revert", curBlock.Block.LedgerIndex-1) - err = s.remoteHandler.SendBatchBlock(prevBlock[0].LedgerIndex, curBlock.Block.LedgerIndex) - if err != nil { - log.DetailError(err) - } - height = prevBlock[0].LedgerIndex - return false, height - } - log.Debug("splitter xrp: check block %d pass", curBlock.Block.LedgerIndex) - return true, height -} - -func (s *XRPSplitter) SaveBlock(data *XRPBlockData) error { - startTime := time.Now() - tx := service.NewTransaction(s.cfg.Engine) - defer tx.Close() - - err := tx.Begin() - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - blockTemp := new(model.Block) - blockTemp.LedgerIndex = data.Block.LedgerIndex - has, err := tx.Get(blockTemp) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - if has { - if blockTemp.Hash == data.Block.Hash || blockTemp.CloseTime < data.Block.CloseTime { - log.Warn("splitter xrp: block %d has been stored", data.Block.LedgerIndex) - _ = tx.Rollback() - return nil - } else { - log.Warn("splitter xrp: block %d need to be replaced by the new one", data.Block.LedgerIndex) - s.revertLedger(data.Block.LedgerIndex) - } - } - - var affected int64 - blocks := make([]*model.Block, 0) - blocks = append(blocks, data.Block) - affected, err = tx.BatchInsert(blocks) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - log.Debug("splitter xrp: block write %d rows", affected) - - affected, err = tx.BatchInsert(data.Transactions) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - log.Debug("splitter xrp: transaction write %d rows", affected) - - affected, err = tx.BatchInsert(data.AffectedNodes) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - log.Debug("splitter xrp: affected nodes write %d rows", affected) - - affected, err = tx.BatchInsert(data.Paths) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - log.Debug("splitter xrp: paths write %d rows", affected) - - affected, err = tx.BatchInsert(data.Amounts) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - log.Debug("splitter xrp: amounts write %d rows", affected) - err = tx.Commit() - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - return err - } - tx.Close() - stats.Add(MetricDatabaseCommit, 1) - elaspedTime := time.Now().Sub(startTime) - s.latestSaveDataTimestamp = time.Now() - log.Debug("splitter xrp: block %d write done elasped: %s", data.Block.LedgerIndex, elaspedTime.String()) - return nil -} -func (s *XRPSplitter) revertLedger(ledgerIndex int64) error { - tx := service.NewTransaction(s.cfg.Engine) - defer tx.Close() - - err := tx.Begin() - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - sql := fmt.Sprintf("delete from xrp_block where ledger_index = %d", ledgerIndex) - _, err = tx.Exec(sql) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - sql = fmt.Sprintf("delete from xrp_transaction where ledger_index = %d", ledgerIndex) - _, err = tx.Exec(sql) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - sql = fmt.Sprintf("delete from xrp_amount where ledger_index = %d", ledgerIndex) - _, err = tx.Exec(sql) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - sql = fmt.Sprintf("delete from xrp_transaction_affected_nodes where ledger_index = %d", ledgerIndex) - _, err = tx.Exec(sql) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - sql = fmt.Sprintf("delete from xrp_path where ledger_index = %d", ledgerIndex) - _, err = tx.Exec(sql) - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - stats.Add(MetricDatabaseRollback, 1) - return err - } - err = tx.Commit() - if err != nil { - _ = tx.Rollback() - log.DetailError(err) - return err - } - tx.Close() - return nil -} -func (s *XRPSplitter) CheckMissedBlock() ([]int64, error) { - missedList := make([]int64, 0) - - db := service.NewDatabase(s.cfg.Engine) - sql := fmt.Sprintf("SELECT height FROM xrp_block ORDER BY height ASC") - data, err := db.QueryString(sql) - if err != nil { - return nil, err - } - - blockList := make([]*model.Block, 0) - for _, v := range data { - block := new(model.Block) - tmp := v["height"] - height, err := strconv.ParseInt(tmp, 10, 64) - if err != nil { - return nil, err - } - block.LedgerIndex = height - blockList = append(blockList, block) - } - - if len(blockList) > 0 { - checkList := make(map[int64]bool, 0) - for _, b := range blockList { - checkList[b.LedgerIndex] = true - } - - for i := int64(0); i <= blockList[len(blockList)-1].LedgerIndex; i++ { - if _, ok := checkList[i]; !ok { - missedList = append(missedList, i) - } - } - } - - return missedList, nil -} - -func (s *XRPSplitter) jsonSchemaValid(data string) (bool, error) { - startTime := time.Now() - dataLoader := gojsonschema.NewStringLoader(data) - result, err := gojsonschema.Validate(s.jsonSchemaLoader, dataLoader) - if err != nil { - log.Error("splitter xrp: json schema validation error") - log.DetailError(err) - return false, err - } - if !result.Valid() { - for _, err := range result.Errors() { - log.Error("splitter xrp: data invalid %s", strings.ToLower(err.String())) - return false, nil - } - stats.Add(MetricVaildationError, 1) - } else { - stats.Add(MetricVaildationSuccess, 1) - } - elaspedTime := time.Now().Sub(startTime) - log.Debug("splitter xrp: json schema validation elasped %s", elaspedTime) - return true, nil -} - -func (s *XRPSplitter) databaseWorker(i int) { - log.Info("splitter xrp: starting database worker %d", i) - for { - select { - case data := <-s.databaseWorkerChan: - err := s.SaveBlock(data) - if err != nil { - log.Error("splitter xrp: block %d save error, retry after 5s", data.Block.LedgerIndex) - log.DetailError(err) - } - case stop := <-s.databaseWorkerStopChan: - if stop { - msg := fmt.Sprintf("splitter xrp: database worker %d stopped", i) - log.Info("splitter xrp: ", msg) - return - } - } - } -} -- GitLab