提交 12d2ea8a 编写于 作者: E enhellowhy 提交者: wanghuanyu3

feat: restructure ripple splitter module.

上级 e0bcf208
......@@ -86,8 +86,8 @@ func (c *Client) CallXRP(method string, params ...interface{}) ([]byte, error) {
errorMap := json.GetBytes(response, "error").Map()
if len(errorMap) != 0 {
code := json.GetBytes(response, "error.code").Int()
msg := json.GetBytes(response, "error.message").String()
code := json.GetBytes(response, "error_code").Int()
msg := json.GetBytes(response, "error_message").String()
return nil, errors.New(fmt.Sprintf("error: %d %s", code, msg))
}
......
......@@ -555,6 +555,63 @@ max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/xlm-sql.log
debug = false
# =============================== xrp ==================================
[xrp]
#是否开启 xrp 数据splitter
enable = true
#是否开启数据库
database_enable = true
#数据库worker缓存大小
database_worker_buffer = 8192
#数据库worker数量
database_worker_number = 20
#一次请求区块链数据的最大块数,400000块之前可以设置大一些,比如300
max_batch_block = 1000
#xrp 全节点的地址
endpoint = http://[xrp 全节点的ip/域名]:[xrp 全节点运行端口]
#xrp 数据校验规则文件地址
json_schema_file = /etc/bds-splitter/schema/xrp.json
#xrp 数据校验是否开启
json_schema_validation_enable = false
#xrp 定时任务配置
[cron.xrp]
update_meta_expr = @every 1m
get_batch_ledger_expr = @every 1m
#xrp kafka 配置
[kafka.xrp]
enable = true
topic = xrp
# kafka 客户端标示
client_id = xrp-client-1
# kafka 消费组标示
group_id = xrp-group
# kafka 服务的地址
broker_list = [kafka 服务的ip/域名]:[kafka 服务的运行端口]
buffer_size = 1000
return_errors = true
#xrp 数据库配置
[database.xrp]
#数据库类型,sql server为mssql,postgre为postgres
type = postgres
#数据库的访问地址
host = [数据库服务的ip/域名]
#数据库的端口信息
port = [数据库服务的端口]
#数据库的库名,需要初始化好,创建表和导入数据用
database = [数据库服务的库]
#数据库的访问账号
user = [数据库服务的账号]
#数据库的访问账号密码信息
password = [数据库服务的密码]
timezone = Asia/Shanghai
max_open_conns = 500
max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/xrp-sql.log
debug = false
# =============================== log ==================================
#普通日志配置
......
......@@ -231,8 +231,6 @@ type XRP struct {
DatabaseEnable bool `ini:"database_enable"`
DatabaseWorkerBuffer int `ini:"database_worker_buffer"`
DatabaseWorkerNumber int `ini:"database_worker_number"`
SkipHeight int `ini:"skip_height"`
SkipMissBlock bool `ini:"skip_miss_block"`
MaxBatchBlock int `ini:"max_batch_block"`
Endpoint string `ini:"endpoint"`
KafkaProxyHost string `ini:"kafka_proxy_host"`
......@@ -303,9 +301,8 @@ type CronTRON struct {
UpdateMetaExpr string `ini:"update_meta_expr"`
}
type CronXRP struct {
UpdateMetaExpr string `ini:"update_meta_expr"`
GetBatchBlockExpr string `ini:"get_batch_block_expr"`
RefreshPoolNameExpr string `ini:"refresh_pool_name_expr"`
UpdateMetaExpr string `ini:"update_meta_expr"`
GetBatchLedgerExpr string `ini:"get_batch_ledger_expr"`
}
type CronXLM struct {
......
......@@ -14,9 +14,9 @@ import (
"github.com/jdcloud-bds/bds/service/model/etc"
"github.com/jdcloud-bds/bds/service/model/eth"
"github.com/jdcloud-bds/bds/service/model/ltc"
"github.com/jdcloud-bds/bds/service/model/ripple"
"github.com/jdcloud-bds/bds/service/model/tron"
"github.com/jdcloud-bds/bds/service/model/xlm"
"github.com/jdcloud-bds/bds/service/model/xrp"
)
var (
......@@ -130,14 +130,23 @@ var (
new(eos.Transaction).TableName(): new(eos.Transaction),
new(eos.Action).TableName(): new(eos.Action),
// xrp
new(xrp.Meta).TableName(): new(xrp.Meta),
new(xrp.Block).TableName(): new(xrp.Block),
new(xrp.Transaction).TableName(): new(xrp.Transaction),
new(xrp.Path).TableName(): new(xrp.Path),
new(xrp.Account).TableName(): new(xrp.Account),
new(xrp.AffectedNodes).TableName(): new(xrp.AffectedNodes),
new(xrp.Amount).TableName(): new(xrp.Amount),
// ripple
new(ripple.Meta).TableName(): new(ripple.Meta),
new(ripple.Ledger).TableName(): new(ripple.Ledger),
new(ripple.AccountSet).TableName(): new(ripple.AccountSet),
new(ripple.DepositPreauth).TableName(): new(ripple.DepositPreauth),
new(ripple.EscrowCancel).TableName(): new(ripple.EscrowCancel),
new(ripple.EscrowCreate).TableName(): new(ripple.EscrowCreate),
new(ripple.EscrowFinish).TableName(): new(ripple.EscrowFinish),
new(ripple.OfferCancel).TableName(): new(ripple.OfferCancel),
new(ripple.OfferCreate).TableName(): new(ripple.OfferCreate),
new(ripple.Payment).TableName(): new(ripple.Payment),
new(ripple.PaymentChannelClaim).TableName(): new(ripple.PaymentChannelClaim),
new(ripple.PaymentChannelCreate).TableName(): new(ripple.PaymentChannelCreate),
new(ripple.PaymentChannelFund).TableName(): new(ripple.PaymentChannelFund),
new(ripple.SetRegularKey).TableName(): new(ripple.SetRegularKey),
new(ripple.SignerListSet).TableName(): new(ripple.SignerListSet),
new(ripple.TrustSet).TableName(): new(ripple.TrustSet),
// doge
new(doge.Meta).TableName(): new(doge.Meta),
......@@ -456,13 +465,13 @@ func CheckEOSTable(engine *xorm.Engine) error {
}
func CheckXRPTable(engine *xorm.Engine) error {
err := syncTable(engine, fmt.Sprintf("%s_meta", xrp.TablePrefix), nil)
err := syncTable(engine, fmt.Sprintf("%s_meta", ripple.TablePrefix), nil)
if err != nil {
return err
}
fn := func(tableName string) error {
meta := new(xrp.Meta)
meta := new(ripple.Meta)
meta.Name = engine.TableName(tableName)
if meta.Name != meta.TableName() {
has, err := engine.Get(meta)
......@@ -482,7 +491,7 @@ func CheckXRPTable(engine *xorm.Engine) error {
return nil
}
err = syncTable(engine, xrp.TablePrefix, fn)
err = syncTable(engine, ripple.TablePrefix, fn)
if err != nil {
return err
}
......
package xrp
package ripple
import (
"github.com/jdcloud-bds/bds/common/math"
)
type Block struct {
type Ledger struct {
ID int64 `xorm:"id bigint autoincr pk"`
Accepted int `xorm:"accepted tinyint notnull"`
AccountHash string `xorm:"account_hash varchar(128) notnull index"`
AccountHash string `xorm:"account_hash varchar(128) notnull"`
CloseFlags int `xorm:"close_flags tinyint notnull"`
Timestamp int64 `xorm:"timestamp int notnull index"`
CloseTime int64 `xorm:"close_time int notnull"`
CloseTimeHuman string `xorm:"close_time_human varchar(56) notnull"`
CloseTimeResolution int `xorm:"close_time_resolution int notnull"`
......@@ -18,12 +19,12 @@ type Block struct {
LedgerIndex int64 `xorm:"ledger_index bigint unique notnull index"`
ParentCloseTime int64 `xorm:"parent_close_time int notnull"`
ParentHash string `xorm:"parent_hash varchar(128) notnull"`
SeqNum int64 `xorm:"seq_num bigint notnull index"`
SeqNum int64 `xorm:"seq_num bigint notnull"`
TotalCoins math.HexOrDecimal256 `xorm:"total_coins decimal(38,0) null"`
TransactionHash string `xorm:"transaction_hash varchar(128) notnull"`
TransactionLength int `xorm:"tx_len int notnull"`
}
func (t Block) TableName() string {
return tableName("block")
func (t Ledger) TableName() string {
return tableName("ledger")
}
package ripple
import (
"fmt"
"time"
)
const (
TablePrefix = "ripple"
)
type Meta struct {
ID int64 `xorm:"id bigint autoincr pk"`
Name string `xorm:"name varchar(255) notnull unique"`
LastID int64 `xorm:"last_id bigint notnull"`
Count int64 `xorm:"count bigint notnull"`
CreatedTime time.Time `xorm:"created_time created notnull"`
UpdatedTime time.Time `xorm:"updated_time updated notnull"`
}
func (t Meta) TableName() string {
return tableName("meta")
}
func tableName(s string) string {
if len(TablePrefix) == 0 {
return s
}
return fmt.Sprintf("%s_%s", TablePrefix, s)
}
type TransactionCommonFields struct {
ID int64 `xorm:"id bigint autoincr pk"`
// The type of transaction. Valid types include: Payment, OfferCreate, OfferCancel, TrustSet,
// AccountSet, SetRegularKey, SignerListSet, EscrowCreate, EscrowFinish, EscrowCancel,
// PaymentChannelCreate, PaymentChannelFund, PaymentChannelClaim, and DepositPreauth.
TransactionType string `xorm:"transaction_type char(32) notnull index"`
Account string `xorm:"account varchar(68) notnull index"`
Hash string `xorm:"hash varchar(128) notnull index"`
LedgerIndex int64 `xorm:"ledger_index bigint index"`
Timestamp int64 `xorm:"timestamp int notnull index"`
Fee int64 `xorm:"fee bigint notnull"` //in drops
Sequence int64 `xorm:"sequence bigint notnull"`
AccountTxnID string `xorm:"account_txn_id varchar(128) null"`
Flags int64 `xorm:"flags bigint null"`
LastLedgerSequence int64 `xorm:"last_ledger_sequence bigint null"`
Memos string `xorm:"memos text null"`
Signers string `xorm:"signers text null"`
SourceTag int64 `xorm:"source_tag bigint null"`
SigningPubKey string `xorm:"signing_pub_key varchar(128) null"`
TxnSignature string `xorm:"txn_signature varchar(256) null"`
//metadata
TransactionResult string `xorm:"transaction_result varchar(32) null"`
TransactionIndex int `xorm:"transaction_index bigint null"`
AffectedNodes string `xorm:"affected_nodes text null"`
DeliveredAmount string `xorm:"delivered_amount text null"`
}
package ripple
type AccountSet struct {
TransactionCommonFields `xorm:"extends"`
ClearFlag int64 `xorm:"clear_flags bigint null"`
Domain string `xorm:"domain varchar(1024) null"`
EmailHash string `xorm:"email_hash char(128) null"`
MessageKey string `xorm:"message_key varchar(128) null"`
SetFlag int64 `xorm:"set_flag bigint null"`
TransferRate int64 `xorm:"transfer_rate bigint null"`
TickSize int `xorm:"tick_size int null"`
}
func (t AccountSet) TableName() string {
return tableName("transaction_account_set")
}
package ripple
type DepositPreauth struct {
TransactionCommonFields `xorm:"extends"`
Authorize string `xorm:"authorize varchar(68) null"`
UnAuthorize string `xorm:"un_authorize varchar(68) null"`
}
func (t DepositPreauth) TableName() string {
return tableName("transaction_deposit_preauth")
}
package ripple
type EscrowCancel struct {
TransactionCommonFields `xorm:"extends"`
Owner string `xorm:"owner varchar(68) null"`
OfferSequence int64 `xorm:"offer_sequence bigint null"`
}
func (t EscrowCancel) TableName() string {
return tableName("transaction_escrow_cancel")
}
package ripple
type EscrowCreate struct {
TransactionCommonFields `xorm:"extends"`
Amount string `xorm:"amount text notnull"`
Destination string `xorm:"destination varchar(68) notnull"`
CancelAfter int64 `xorm:"cancel_after bigint null"`
FinishAfter int64 `xorm:"finish_after bigint null"`
Condition string `xorm:"condition varchar(512) null"`
DestinationTag int64 `xorm:"destination_tag bigint null"`
}
func (t EscrowCreate) TableName() string {
return tableName("transaction_escrow_create")
}
package ripple
type EscrowFinish struct {
TransactionCommonFields `xorm:"extends"`
Owner string `xorm:"owner varchar(68) null"`
OfferSequence int64 `xorm:"offer_sequence bigint null"`
Condition string `xorm:"condition varchar(512) null"`
Fulfillment string `xorm:"fulfillment varchar(512) null"`
}
func (t EscrowFinish) TableName() string {
return tableName("transaction_escrow_finish")
}
package ripple
type OfferCancel struct {
TransactionCommonFields `xorm:"extends"`
OfferSequence int64 `xorm:"offer_sequence bigint notnull"`
}
func (t OfferCancel) TableName() string {
return tableName("transaction_offer_cancel")
}
package ripple
type OfferCreate struct {
TransactionCommonFields `xorm:"extends"`
Expiration int64 `xorm:"expiration int null"`
OfferSequence int64 `xorm:"offer_sequence bigint null"`
TakerGets string `xorm:"taker_gets text notnull"`
TakerPays string `xorm:"taker_pays text notnull"`
}
func (t OfferCreate) TableName() string {
return tableName("transaction_offer_create")
}
package ripple
type Payment struct {
TransactionCommonFields `xorm:"extends"`
Amount string `xorm:"amount text notnull"`
Destination string `xorm:"destination varchar(68) notnull"`
DestinationTag int64 `xorm:"destination_tag bigint null"`
InvoiceID string `xorm:"invoice_id varchar(128) null"`
Paths string `xorm:"paths text null"`
SendMax string `xorm:"send_max text null"`
DeliverMin string `xorm:"deliver_min text null"`
}
func (t Payment) TableName() string {
return tableName("transaction_payment")
}
package ripple
type PaymentChannelClaim struct {
TransactionCommonFields `xorm:"extends"`
Channel string `xorm:"channel char(64) notnull"`
Amount string `xorm:"amount text null"`
Balance string `xorm:"balance text null"`
PublicKey string `xorm:"public_key char(66) null"`
Signature string `xorm:"signature text null"`
}
func (t PaymentChannelClaim) TableName() string {
return tableName("transaction_payment_channel_claim")
}
package ripple
type PaymentChannelCreate struct {
TransactionCommonFields `xorm:"extends"`
Amount string `xorm:"amount text notnull"`
Destination string `xorm:"destination varchar(68) notnull"`
SettleDelay int64 `xorm:"settleDelay bigint notnull"`
PublicKey string `xorm:"public_key char(66) notnull"`
CancelAfter int64 `xorm:"cancel_after bigint null"`
DestinationTag int64 `xorm:"destination_tag bigint null"`
}
func (t PaymentChannelCreate) TableName() string {
return tableName("transaction_payment_channel_create")
}
package ripple
type PaymentChannelFund struct {
TransactionCommonFields `xorm:"extends"`
Channel string `xorm:"channel char(64) notnull"`
Amount string `xorm:"amount text notnull"`
Expiration int64 `xorm:"expiration int null"`
}
func (t PaymentChannelFund) TableName() string {
return tableName("transaction_payment_channel_fund")
}
package ripple
type SetRegularKey struct {
TransactionCommonFields `xorm:"extends"`
RegularKey string `xorm:"regular_key varchar(68) null"`
}
func (t SetRegularKey) TableName() string {
return tableName("transaction_set_regular_key")
}
package ripple
type SignerListSet struct {
TransactionCommonFields `xorm:"extends"`
SignerQuorum int64 `xorm:"signer_quorum bigint notnull"`
SignerEntries string `xorm:"signer_entries text null"`
}
func (t SignerListSet) TableName() string {
return tableName("transaction_set_regular_key")
}
package ripple
type TrustSet struct {
TransactionCommonFields `xorm:"extends"`
LimitAmount string `xorm:"limit_amount text notnull"`
QualityIn int64 `xorm:"quality_in bigint null"`
QualityOut int64 `xorm:"quality_out bigint null"`
}
func (t TrustSet) TableName() string {
return tableName("transaction_trust_set")
}
package xrp
import (
"github.com/jdcloud-bds/bds/common/math"
)
type Account struct {
ID int64 `xorm:"id bigint autoincr pk"`
Address string `xorm:"address char(34) notnull index"` //账户地址
SignerWeight int `xorm:"signer_weight int null default '0'"`
SignerEntrie string `xorm:"signer_entrie char(34) notnull index"`
Type int `xorm:"type tinyint notnull default '0'"` //账户类型
Balance math.HexOrDecimal256 `xorm:"balance decimal(38,0) notnull default '0'"` //账户余额
Creator string `xorm:"creator char(40) notnull default '' "`
BirthTimestamp int64 `xorm:"birth_timestamp int notnull default '0' "` //账户第一次出现的时间
LastActiveTimestamp int64 `xorm:"last_active_timestamp int notnull default '0'"` //账户上次活跃时间
}
func (t Account) TableName() string {
return tableName("account")
}
package xrp
type Amount struct {
ID int64 `xorm:"id bigint autoincr pk"`
ParentHash string `xorm:"parent_hash char(64) notnull index"`
LedgerIndex int64 `xorm:"ledger_index int index"`
CloseTime int64 `xorm:"close_time int notnull"`
Currency string `xorm:"currency char(8) notnull index"`
Value string `xorm:"value decimal(38,4) notnull default '0'"`
Issuer string `xorm:"issuer char(34) notnull index"`
AmountType int `xorm:"amount_type int notnull"`
//1:Payment Amount 2:Payment SendMax 3:Payment DeliverMin
//4: OfferCreate TakerGets 5:OfferCreate TakerPay
//6: TrustSet LimitAmount
}
func (t Amount) TableName() string {
return tableName("amount")
}
package xrp
import (
"fmt"
"time"
)
const (
TablePrefix = "xrp"
)
type Meta struct {
ID int64 `xorm:"id bigint autoincr pk"`
Name string `xorm:"name varchar(255) notnull unique"`
LastID int64 `xorm:"last_id bigint notnull"`
Count int64 `xorm:"count bigint notnull"`
CreatedTime time.Time `xorm:"created_time created notnull"`
UpdatedTime time.Time `xorm:"updated_time updated notnull"`
}
func (t Meta) TableName() string {
return tableName("meta")
}
func tableName(s string) string {
if len(TablePrefix) == 0 {
return s
}
return fmt.Sprintf("%s_%s", TablePrefix, s)
}
package xrp
type Path struct {
ID int64 `xorm:"id bigint autoincr pk"`
ParentHash string `xorm:"parent_hash char(64) notnull index"`
LedgerIndex int64 `xorm:"ledger_index int index"`
CloseTime int64 `xorm:"close_time int notnull"`
Currency string `xorm:"currency char(8) notnull index"`
Issuer string `xorm:"issuer char(34) notnull index"`
Type int64 `xorm:"type int null"`
Account string `xorm:"account char(34) null index"`
InTxIndex int64 `xorm:"in_tx_index int"`
InPathIndex int64 `xorm:"in_path_index int"`
}
func (t Path) TableName() string {
return tableName("path")
}
package xrp
type Transaction struct {
ID int64 `xorm:"id bigint autoincr pk"`
Account string `xorm:"account varchar(68) notnull index"`
TransactionType string `xorm:"transaction_type char(30) notnull index"`
//The type of transaction. Valid types include: Payment, OfferCreate, OfferCancel, TrustSet,
// AccountSet, SetRegularKey, SignerListSet, EscrowCreate, EscrowFinish, EscrowCancel,
// PaymentChannelCreate, PaymentChannelFund, PaymentChannelClaim, and DepositPreauth.
Fee int64 `xorm:"fee bigint notnull"` //in drops
Sequence int64 `xorm:"sequence bigint notnull"`
AccountTxnID string `xorm:"account_txn_id varchar(128) null"`
Flags int64 `xorm:"flags bigint null"`
LastLedgerSequence int64 `xorm:"last_ledger_sequence bigint null"`
Memos string `xorm:"memos varchar(1024) null"`
Signers string `xorm:"signers varchar(2048) null"`
SourceTag int64 `xorm:"source_tag bigint null"`
SigningPubKey string `xorm:"signing_pub_key varchar(132) null"`
TxnSignature string `xorm:"txn_signature varchar(380) null"`
Hash string `xorm:"hash varchar(128) notnull index"`
LedgerIndex int64 `xorm:"ledger_index bigint index"`
AffectedNodesLen int `xorm:"affected_nodes_len bigint null"`
TransactionResult string `xorm:"transaction_result varchar(30) null"`
TransactionIndex int `xorm:"transaction_index bigint null"`
Validated int `xorm:"validated tinyint null"`
Date int64 `xorm:"date bigint null"`
//additional
CloseTime int64 `xorm:"close_time bigint notnull"`
//Payment
Amount int64 `xorm:"amount bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount
Destination string `xorm:"destination varchar(68) null"`
DestinationTag int64 `xorm:"destination_tag bigint null"`
InvoiceID string `xorm:"invoice_id varchar(128) null"`
//Pathes fk
PathesLen int `xorm:"pathes_len bigint null"`
SendMax int64 `xorm:"send_max bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount
DeliverMin int64 `xorm:"deliver_min bigint notnull"` //if xrp, then in drops, else be -1 and ref: fk-Amount
//OfferCreate OfferCancel
Expiration int64 `xorm:"expiration bigint null"`
OfferSequence int `xorm:"offer_sequence bigint null"`
TakerGets int64 `xorm:"taker_gets bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount
TakerPays int64 `xorm:"taker_pays bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount
//TrustSet
LimitAmount int64 `xorm:"limit_amount bigint null"` //if xrp, then in drops, else be -1 and ref: fk-Amount
QualityIn int64 `xorm:"quality_in bigint null"`
QualityOut int64 `xorm:"quality_out bigint null"`
//AccountSet
ClearFlag int `xorm:"clear_flag bigint null"`
Domain string `xorm:"domain varchar(512) null"`
EmailHash string `xorm:"email_hash char(64) null"`
MessageKey string `xorm:"message_key varchar(68) null"`
SetFlag int `xorm:"set_flag bigint null"`
TransferRate int `xorm:"transfer_rate bigint null"`
TickSize int `xorm:"tick_size bigint null"`
//WalletLocator WalletSize: not used
//SetRegularKey
RegularKey string `xorm:"regular_key varchar(68) null"`
//SignerListSet
SignerQuorum int `xorm:"signer_quorum bigint null"`
//EscrowCreate
CancelAfter int64 `xorm:"cancel_after bigint null"`
FinishAfter int64 `xorm:"finish_after bigint null"`
Condition string `xorm:"condition varchar(512) null"`
//EscrowFinish
Owner string `xorm:"owner varchar(68) null"`
Fulfillment string `xorm:"fulfillment varchar(512) null"`
//EscrowCancel
//PaymentChannelCreate
SettleDelay int64 `xorm:"settle_delay bigint null"`
PublicKey string `xorm:"public_key char(66) null"`
//PaymentChannelFund
Channel string `xorm:"channel char(64) null"`
//PaymentChannelClaim
Balance int64 `xorm:"balance bigint null"`
//DepositPreauth
Authorize string `xorm:"authorize varchar(68) null"`
UnAuthorize string `xorm:"un_authorize varchar(68) null"`
}
func (t Transaction) TableName() string {
return tableName("transaction")
}
package xrp
type AffectedNodes struct {
ID int64 `xorm:"id bigint autoincr pk"`
ParentHash string `xorm:"parent_hash char(64) notnull index"`
LedgerIndex int64 `xorm:"ledger_index int index"`
CloseTime int64 `xorm:"close_time int notnull"`
NodeType string `xorm:"node_type char(20) notnull index"`
LedgerEntryType string `xorm:"ledger_entry_type char(20) null index"`
NodeLedgerIndex string `xorm:"hash char(64) not null"`
PreviousTxnID string `xorm:"previous_txn_id char(64) null"`
PreviousTxnLgrSeq int64 `xorm:"previous_txn_lgr_seq int null"`
FullJsonStr string `xorm:"full_json_str varchar(1024) null"`
}
func (t AffectedNodes) TableName() string {
return tableName("transaction_affected_nodes")
}
package xrp
package ripple
import (
"errors"
"fmt"
"github.com/jdcloud-bds/bds/common/json"
"github.com/jdcloud-bds/bds/common/jsonrpc"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/common/math"
"math/big"
"strconv"
"strings"
)
......@@ -22,60 +19,6 @@ func newRPCHandler(c *jsonrpc.Client) (*rpcHandler, error) {
return h, nil
}
func (h *rpcHandler) GetBlockNumber() (int64, error) {
defer stats.Add(MetricRPCCall, 1)
data, err := h.client.Call("xrp_blockNumber")
if err != nil {
return 0, err
}
v := json.GetBytes(data, "result").String()
if len(v) == 0 {
return 0, errors.New("cannot get block number")
}
number, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return 0, err
}
return number, nil
}
func (h *rpcHandler) GetBalance(address string, height int64) (*big.Int, error) {
defer stats.Add(MetricRPCCall, 1)
if !strings.HasPrefix(address, "0x") {
address = fmt.Sprintf("0x%s", address)
}
var hexNumber string
if height == int64(0) {
hexNumber = "latest"
} else {
hexNumber = fmt.Sprintf("%#x", height)
}
data, err := h.client.Call("xrp_getBalance", address, ""+hexNumber+"")
if err != nil {
log.DetailError(err)
return nil, err
}
tmp := json.GetBytes(data, "result").String()
value, err := math.ParseInt256(tmp)
if err != nil {
log.DetailError(err)
return nil, err
}
return value, nil
}
func (h *rpcHandler) SendBlock(number int64) error {
defer stats.Add(MetricRPCCall, 1)
hexNumber := fmt.Sprintf("%#x", number)
_, err := h.client.Call("xrp_sendBlockByNumber", hexNumber, true)
if err != nil {
return err
}
return nil
}
type CompleteLedgers struct {
startLedger int64
endLedger int64
......@@ -89,20 +32,20 @@ func (h *rpcHandler) GetCompleteLedgers() (map[int]*CompleteLedgers, error) {
return nil, err
}
data := string(res)
cl_str := json.Get(data, "result.info.complete_ledgers").String()
//cl_str demo:"47025320,47025422-47025425,47025527-47025528,47025629-47025661,47025763,47025864-47025877"
log.Info("splitter xrp: get completed ledgers: %s\n\n", cl_str)
if strings.ToLower(cl_str) == "empty" {
clStr := json.Get(data, "result.info.complete_ledgers").String()
//clStr demo:"47025320,47025422-47025425,47025527-47025528,47025629-47025661,47025763,47025864-47025877"
log.Info("splitter ripple: get completed ledgers: %s\n\n", clStr)
if strings.ToLower(clStr) == "empty" {
return nil, nil
}
cl_arr := strings.Split(cl_str, ",")
for i, v := range cl_arr {
clList := strings.Split(clStr, ",")
for i, v := range clList {
cl := new(CompleteLedgers)
ind := strings.Index(v, "-")
if ind > 0 {
cl.startLedger, _ = strconv.ParseInt(v[:ind], 10, 64)
cl.endLedger, _ = strconv.ParseInt(v[ind+1:], 10, 64)
index := strings.Index(v, "-")
if index > 0 {
cl.startLedger, _ = strconv.ParseInt(v[:index], 10, 64)
cl.endLedger, _ = strconv.ParseInt(v[index+1:], 10, 64)
} else {
cl.startLedger, _ = strconv.ParseInt(v, 10, 64)
cl.endLedger, _ = strconv.ParseInt(v, 10, 64)
......@@ -111,16 +54,26 @@ func (h *rpcHandler) GetCompleteLedgers() (map[int]*CompleteLedgers, error) {
}
return totalCompleteLedgers, nil
}
func (h *rpcHandler) SendBatchBlock(startNumber, endNumber int64) error {
func (h *rpcHandler) SendBatchLedger(start, end int64) error {
defer stats.Add(MetricRPCCall, 1)
params := make(map[string]interface{}, 0)
params["start_ledger_index"] = startNumber
params["end_ledger_index"] = endNumber
params := make(map[string]int64, 0)
params[ParamStartLedgerIndex] = start
params[ParamEndLedgerIndex] = end
res, err := h.client.CallXRP("send_batch_ledger", params)
log.Info("splitter xrp: send batch ledger res: %s", string(res))
result, err := h.client.CallXRP("send_batch_ledger", params)
log.Debug("splitter ripple: send batch ledger result: %s", string(result))
if err != nil {
return err
}
status := json.Get(string(result), "result.status").String()
errorMessage := json.Get(string(result), "result.error_message").String()
if status == "error" {
log.Error("splitter ripple: send batch ledger result error : %s", errorMessage)
log.DetailDebug("splitter ripple: send batch ledger result error : %s", errorMessage)
return errors.New("result status is error")
}
return nil
}
package ripple
import (
"github.com/jdcloud-bds/bds/common/httputils"
"github.com/jdcloud-bds/bds/common/jsonrpc"
"testing"
)
func TestSendBatchLedger(t *testing.T) {
httpClient := httputils.NewRestClientWithAuthentication(nil)
remoteHandler, err := newRPCHandler(jsonrpc.New(httpClient, "http://116.196.114.8:5555"))
if err != nil {
t.Fatal(err)
}
err = remoteHandler.SendBatchLedger(50588462, 50588462)
if err != nil {
t.Fatal(err)
}
t.Log("over")
}
func TestGetCompleteLedgers(t *testing.T) {
httpClient := httputils.NewRestClientWithAuthentication(nil)
remoteHandler, err := newRPCHandler(jsonrpc.New(httpClient, "http://116.196.114.8:5555"))
if err != nil {
t.Fatal(err)
}
result, err := remoteHandler.GetCompleteLedgers()
if err != nil {
t.Fatal(err)
}
t.Logf("complete ledgers result : %v", result)
}
package xrp
package ripple
import (
"fmt"
......@@ -41,7 +41,7 @@ func (j *updateMetaDataJob) run() error {
metas := make([]*model.Meta, 0)
err := db.Find(&metas)
if err != nil {
log.Error("worker xrp: job '%s' get table list from meta error", j.name)
log.Error("ripple job : '%s' get table list from meta error", j.name)
return err
}
......@@ -58,7 +58,7 @@ func (j *updateMetaDataJob) run() error {
}
result, err := db.QueryString(countSql)
if err != nil {
log.Error("worker xrp: job %s get table %s count from meta error", j.name, meta.Name)
log.Error("ripple job : %s get table %s count from meta error", j.name, meta.Name)
log.DetailError(err)
continue
}
......@@ -70,7 +70,7 @@ func (j *updateMetaDataJob) run() error {
sql := db.Table(meta.Name).Cols("id").Desc("id").Limit(1, 0)
result, err = sql.QueryString()
if err != nil {
log.Error("worker xrp: job '%s' get table %s id from meta error", j.name, meta.Name)
log.Error("ripple job : '%s' get table %s id from meta error", j.name, meta.Name)
log.DetailError(err)
continue
}
......@@ -80,7 +80,7 @@ func (j *updateMetaDataJob) run() error {
data.Count = count
_, err = db.Update(data, cond)
if err != nil {
log.Error("worker xrp: job '%s' update table %s meta error", j.name, meta.Name)
log.Error("ripple job : '%s' update table %s meta error", j.name, meta.Name)
log.DetailError(err)
continue
}
......@@ -88,142 +88,72 @@ func (j *updateMetaDataJob) run() error {
}
}
stats.Add(MetricCronWorkerJobUpdateMetaData, 1)
elaspedTime := time.Now().Sub(startTime)
log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String())
elapsedTime := time.Now().Sub(startTime)
log.Debug("ripple job : '%s' elapsed time %s", j.name, elapsedTime.String())
return nil
}
type getBatchBlockJob struct {
type getBatchLedgerJob struct {
splitter *XRPSplitter
name string
}
func newGetBatchBlockJob(splitter *XRPSplitter) *getBatchBlockJob {
j := new(getBatchBlockJob)
func newGetBatchLedgerJob(splitter *XRPSplitter) *getBatchLedgerJob {
j := new(getBatchLedgerJob)
j.splitter = splitter
j.name = "'get batch block'"
j.name = "'get batch ledger'"
return j
}
func (j *getBatchBlockJob) Run() {
func (j *getBatchLedgerJob) Run() {
_ = j.run()
}
func (j *getBatchBlockJob) Name() string {
func (j *getBatchLedgerJob) Name() string {
return j.name
}
func (j *getBatchBlockJob) run0() error {
func (j *getBatchLedgerJob) run() error {
startTime := time.Now()
db := service.NewDatabase(j.splitter.cfg.Engine)
totalCompleteLedgers, err := j.splitter.remoteHandler.GetCompleteLedgers()
if err != nil {
log.Error("worker xrp: job %s get closed ledgers error.", j.name)
log.Error("worker xrp: %s", err.Error())
}
batchSize := int64(1000)
ledgerList := make(map[int64]bool, 0)
sql := "select ledger_index from xrp_block order by ledger_index asc"
result, err := db.QueryString(sql)
if err != nil {
log.Error("worker xrp: get ledger_index from database error")
log.DetailError(err)
log.Error("ripple job : %s get closed ledgers error", j.name)
log.DetailError("ripple job error : %s", err.Error())
return err
}
for _, v := range result {
ledger_index, _ := strconv.ParseInt(v["ledger_index"], 10, 64)
ledgerList[ledger_index] = true
}
for _, cl := range totalCompleteLedgers {
missedLedger := make(map[int64]bool, 0)
i := int64(0)
sendStart := cl.endLedger
sendEnd := cl.startLedger
for k := cl.startLedger; k <= cl.endLedger; k++ {
if _, ok := ledgerList[k]; !ok {
missedLedger[k] = true
if k < sendStart {
sendStart = k
}
if k > sendEnd {
sendEnd = k
}
}
i++
if i >= batchSize {
if sendEnd >= sendStart && len(missedLedger) > 0 {
log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd)
err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd)
if err != nil {
log.Error("splitter xrp: send batch block error: %s", err.Error())
return err
}
}
missedLedger = make(map[int64]bool, 0)
i = 0
sendStart = cl.endLedger
sendEnd = k + 1
}
}
if len(missedLedger) > 0 {
if sendEnd >= sendStart {
log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd)
err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd)
if err != nil {
log.Error("splitter xrp: send batch block error: %s", err.Error())
return err
}
}
}
}
elaspedTime := time.Now().Sub(startTime)
log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String())
return nil
}
func (j *getBatchBlockJob) run() error {
startTime := time.Now()
db := service.NewDatabase(j.splitter.cfg.Engine)
totalCompleteLedgers, err := j.splitter.remoteHandler.GetCompleteLedgers()
if err != nil {
log.Error("worker xrp: job %s get closed ledgers error\n\n", j.name)
log.Error("worker xrp: %s", err.Error())
}
batchSize := int64(1000)
batchSize := 1000
ledgerList := make(map[int64]bool, 0)
sql := "select ledger_index from xrp_block order by ledger_index asc"
sql := "select ledger_index from ripple_ledger order by ledger_index asc"
result, err := db.QueryString(sql)
if err != nil {
log.Error("worker xrp: get ledger_index from database error")
log.Error("ripple job : get ledger_index from database error")
log.DetailError(err)
return err
}
for _, v := range result {
ledger_index, _ := strconv.ParseInt(v["ledger_index"], 10, 64)
ledgerList[ledger_index] = true
ledgerIndex, _ := strconv.ParseInt(v["ledger_index"], 10, 64)
ledgerList[ledgerIndex] = true
}
for _, cl := range totalCompleteLedgers {
missedLedger := make(map[int64]bool, 0)
i := int64(0)
sendStart := int64(0)
sendEnd := int64(0)
countStart := false
i := 0
start := int64(0)
end := int64(0)
flag := false
for k := cl.startLedger; k <= cl.endLedger; k++ {
if _, ok := ledgerList[k]; !ok {
if !countStart {
sendStart = k
sendEnd = k
countStart = true
if !flag {
start = k
end = k
flag = true
} else {
sendEnd = k
end = k
}
missedLedger[k] = true
i++
} else if countStart {
} else if flag {
break
}
......@@ -231,20 +161,18 @@ func (j *getBatchBlockJob) run() error {
break
}
}
if len(missedLedger) > 0 {
if sendEnd >= sendStart {
log.Info("splitter xrp: send batch block from %d to %d.", sendStart, sendEnd)
err = j.splitter.remoteHandler.SendBatchBlock(sendStart, sendEnd)
if err != nil {
log.Error("splitter xrp: send batch block error: %s", err.Error())
return err
}
if len(missedLedger) > 0 && end >= start {
log.Info("splitter ripple: send batch ledger from %d to %d.", start, end)
err = j.splitter.remoteHandler.SendBatchLedger(start, end)
if err != nil {
log.Error("splitter ripple: send batch ledger error: %s", err.Error())
log.DetailError(err)
return err
}
}
}
elaspedTime := time.Now().Sub(startTime)
log.Debug("worker xrp: job '%s' elasped time %s", j.name, elaspedTime.String())
elapsedTime := time.Now().Sub(startTime)
log.Debug("ripple job : '%s' elapsed time %s", j.name, elapsedTime.String())
return nil
}
package ripple
import (
"github.com/jdcloud-bds/bds/common/metric"
model "github.com/jdcloud-bds/bds/service/model/ripple"
"math/big"
)
const (
MetricReceiveMessages = "receive_messages"
MetricParseDataError = "parse_data_error"
MetricVaildationSuccess = "validation_success"
MetricVaildationError = "validation_error"
MetricDatabaseRollback = "database_rollback"
MetricDatabaseCommit = "database_commit"
MetricCronWorkerJob = "cron_worker_job"
MetricCronWorkerJobUpdateMetaData = "cron_worker_job_update_meta_data"
MetricCronWorkerJobGetBatchLedger = "cron_worker_job_get_batch_ledger"
MetricRPCCall = "rpc_call"
ParamStartLedgerIndex = "start_ledger_index"
ParamEndLedgerIndex = "end_ledger_index"
)
const (
AccountSet = "AccountSet"
DepositPreauth = "DepositPreauth"
EscrowCancel = "EscrowCancel"
EscrowCreate = "EscrowCreate"
EscrowFinish = "EscrowFinish"
OfferCancel = "OfferCancel"
OfferCreate = "OfferCreate"
Payment = "Payment"
PaymentChannelClaim = "PaymentChannelClaim"
PaymentChannelCreate = "PaymentChannelCreate"
PaymentChannelFund = "PaymentChannelFund"
SetRegularKey = "SetRegularKey"
SignerListSet = "SignerListSet"
TrustSet = "TrustSet"
)
var (
stats = metric.NewMap("ripple")
maxBigNumber, _ = new(big.Int).SetString("100000000000000000000000000000000000000", 10)
defaultBigNumber, _ = new(big.Int).SetString("-1", 10)
)
type XRPLedgerData struct {
Ledger *model.Ledger
AccountSets []*model.AccountSet
DepositPreauths []*model.DepositPreauth
EscrowCancels []*model.EscrowCancel
EscrowCreates []*model.EscrowCreate
EscrowFinishes []*model.EscrowFinish
OfferCancels []*model.OfferCancel
OfferCreates []*model.OfferCreate
Payments []*model.Payment
PaymentChannelClaims []*model.PaymentChannelClaim
PaymentChannelCreates []*model.PaymentChannelCreate
PaymentChannelFunds []*model.PaymentChannelFund
SetRegularKeys []*model.SetRegularKey
SignerListSets []*model.SignerListSet
TrustSets []*model.TrustSet
}
package ripple
import (
"github.com/jdcloud-bds/bds/common/json"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/common/math"
model "github.com/jdcloud-bds/bds/service/model/ripple"
"time"
)
func parseTransactionCommonFields(commonFields *model.TransactionCommonFields, input string, index, timestamp int64) {
commonFields.TransactionType = json.Get(input, "TransactionType").String()
commonFields.ID = 0
commonFields.Account = json.Get(input, "Account").String()
commonFields.Hash = json.Get(input, "hash").String()
commonFields.LedgerIndex = index
commonFields.Timestamp = timestamp
commonFields.Fee = json.Get(input, "Fee").Int()
commonFields.Sequence = json.Get(input, "Sequence").Int()
commonFields.AccountTxnID = json.Get(input, "AccountTxnID").String()
commonFields.Flags = json.Get(input, "Flags").Int()
commonFields.LastLedgerSequence = json.Get(input, "LastLedgerSequence").Int()
commonFields.Memos = json.Get(input, "Memos").String()
commonFields.Signers = json.Get(input, "Signers").String()
commonFields.SourceTag = json.Get(input, "SourceTag").Int()
commonFields.SigningPubKey = json.Get(input, "SigningPubKey").String()
commonFields.TxnSignature = json.Get(input, "TxnSignature").String()
commonFields.TransactionResult = json.Get(input, "metaData.TransactionResult").String()
commonFields.TransactionIndex = int(json.Get(input, "metaData.TransactionIndex").Int())
commonFields.AffectedNodes = json.Get(input, "metaData.AffectedNodes").String()
commonFields.DeliveredAmount = json.Get(input, "metaData.DeliveredAmount").String()
}
func ParseLedger(data string) (*XRPLedgerData, error) {
startTime := time.Now()
var err error
b := new(XRPLedgerData)
b.Ledger = new(model.Ledger)
b.AccountSets = make([]*model.AccountSet, 0)
b.DepositPreauths = make([]*model.DepositPreauth, 0)
b.EscrowCancels = make([]*model.EscrowCancel, 0)
b.EscrowCreates = make([]*model.EscrowCreate, 0)
b.EscrowFinishes = make([]*model.EscrowFinish, 0)
b.OfferCancels = make([]*model.OfferCancel, 0)
b.OfferCreates = make([]*model.OfferCreate, 0)
b.Payments = make([]*model.Payment, 0)
b.PaymentChannelClaims = make([]*model.PaymentChannelClaim, 0)
b.PaymentChannelCreates = make([]*model.PaymentChannelCreate, 0)
b.PaymentChannelFunds = make([]*model.PaymentChannelFund, 0)
b.SetRegularKeys = make([]*model.SetRegularKey, 0)
b.SignerListSets = make([]*model.SignerListSet, 0)
b.TrustSets = make([]*model.TrustSet, 0)
//Ledger
b.Ledger.Accepted = int(json.Get(data, "accepted").Int())
b.Ledger.AccountHash = json.Get(data, "account_hash").String()
b.Ledger.CloseFlags = int(json.Get(data, "close_flags").Int())
b.Ledger.CloseTime = json.Get(data, "close_time").Int()
b.Ledger.Timestamp = json.Get(data, "close_time").Int() + 946656000
b.Ledger.CloseTimeHuman = json.Get(data, "close_time_human").String()
b.Ledger.CloseTimeResolution = int(json.Get(data, "close_time_resolution").Int())
b.Ledger.Closed = int(json.Get(data, "closed").Int())
b.Ledger.Hash = json.Get(data, "hash").String()
b.Ledger.LedgerHash = json.Get(data, "ledger_hash").String()
b.Ledger.LedgerIndex = json.Get(data, "ledger_index").Int()
b.Ledger.ParentCloseTime = json.Get(data, "parent_close_time").Int()
b.Ledger.ParentHash = json.Get(data, "parent_hash").String()
b.Ledger.SeqNum = json.Get(data, "seqNum").Int()
totalCoins := json.Get(data, "total_coins").String()
b.Ledger.TotalCoins, err = parseBigInt(totalCoins)
if err != nil {
log.Error("splitter ripple: Ledger %d TotalCoins '%s' parse error", b.Ledger.LedgerIndex, totalCoins)
return nil, err
}
b.Ledger.TransactionHash = json.Get(data, "transaction_hash").String()
transactionList := json.Get(data, "transactions").Array()
for _, transaction := range transactionList {
transactionType := json.Get(transaction.String(), "TransactionType").String()
switch transactionType {
case AccountSet:
accountSet := new(model.AccountSet)
parseTransactionCommonFields(&accountSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
accountSet.ClearFlag = json.Get(transaction.String(), "ClearFlag").Int()
accountSet.Domain = json.Get(transaction.String(), "Domain").String()
accountSet.EmailHash = json.Get(transaction.String(), "EmailHash").String()
accountSet.MessageKey = json.Get(transaction.String(), "MessageKey").String()
accountSet.SetFlag = json.Get(transaction.String(), "SetFlag").Int()
accountSet.TransferRate = json.Get(transaction.String(), "TransferRate").Int()
b.AccountSets = append(b.AccountSets, accountSet)
log.Debug("splitter ripple: %s transaction %s .", transactionType, accountSet.Hash)
case DepositPreauth:
depositPreauth := new(model.DepositPreauth)
parseTransactionCommonFields(&depositPreauth.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
depositPreauth.Authorize = json.Get(transaction.String(), "Authorize").String()
depositPreauth.UnAuthorize = json.Get(transaction.String(), "Unauthorize").String()
b.DepositPreauths = append(b.DepositPreauths, depositPreauth)
log.Debug("splitter ripple: %s transaction %s .", transactionType, depositPreauth.Hash)
case EscrowCancel:
escrowCancel := new(model.EscrowCancel)
parseTransactionCommonFields(&escrowCancel.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
escrowCancel.Owner = json.Get(transaction.String(), "Owner").String()
escrowCancel.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int()
b.EscrowCancels = append(b.EscrowCancels, escrowCancel)
log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowCancel.Hash)
case EscrowCreate:
escrowCreate := new(model.EscrowCreate)
parseTransactionCommonFields(&escrowCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
escrowCreate.Amount = json.Get(transaction.String(), "Amount").String()
escrowCreate.Destination = json.Get(transaction.String(), "Destination").String()
escrowCreate.CancelAfter = json.Get(transaction.String(), "CancelAfter").Int()
escrowCreate.FinishAfter = json.Get(transaction.String(), "FinishAfter").Int()
escrowCreate.Condition = json.Get(transaction.String(), "Condition").String()
escrowCreate.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int()
b.EscrowCreates = append(b.EscrowCreates, escrowCreate)
log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowCreate.Hash)
case EscrowFinish:
escrowFinish := new(model.EscrowFinish)
parseTransactionCommonFields(&escrowFinish.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
escrowFinish.Owner = json.Get(transaction.String(), "Owner").String()
escrowFinish.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int()
escrowFinish.Condition = json.Get(transaction.String(), "Condition").String()
escrowFinish.Fulfillment = json.Get(transaction.String(), "Fulfillment").String()
b.EscrowFinishes = append(b.EscrowFinishes, escrowFinish)
log.Debug("splitter ripple: %s transaction %s .", transactionType, escrowFinish.Hash)
case OfferCancel:
offerCancel := new(model.OfferCancel)
parseTransactionCommonFields(&offerCancel.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
offerCancel.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int()
b.OfferCancels = append(b.OfferCancels, offerCancel)
log.Debug("splitter ripple: %s transaction %s .", transactionType, offerCancel.Hash)
case OfferCreate:
offerCreate := new(model.OfferCreate)
parseTransactionCommonFields(&offerCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
offerCreate.Expiration = json.Get(transaction.String(), "Expiration").Int()
offerCreate.OfferSequence = json.Get(transaction.String(), "OfferSequence").Int()
offerCreate.TakerGets = json.Get(transaction.String(), "TakerGets").String()
offerCreate.TakerPays = json.Get(transaction.String(), "TakerPays").String()
b.OfferCreates = append(b.OfferCreates, offerCreate)
log.Debug("splitter ripple: %s transaction %s .", transactionType, offerCreate.Hash)
case Payment:
payment := new(model.Payment)
parseTransactionCommonFields(&payment.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
payment.Amount = json.Get(transaction.String(), "Amount").String()
payment.Destination = json.Get(transaction.String(), "Destination").String()
payment.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int()
payment.InvoiceID = json.Get(transaction.String(), "InvoiceID").String()
payment.Paths = json.Get(transaction.String(), "Paths").String()
payment.SendMax = json.Get(transaction.String(), "SendMax").String()
payment.DeliverMin = json.Get(transaction.String(), "DeliverMin").String()
b.Payments = append(b.Payments, payment)
log.Debug("splitter ripple: %s transaction %s .", transactionType, payment.Hash)
case PaymentChannelClaim:
paymentChannelClaim := new(model.PaymentChannelClaim)
parseTransactionCommonFields(&paymentChannelClaim.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
paymentChannelClaim.Amount = json.Get(transaction.String(), "Amount").String()
paymentChannelClaim.Channel = json.Get(transaction.String(), "Channel").String()
paymentChannelClaim.Balance = json.Get(transaction.String(), "Balance").String()
paymentChannelClaim.PublicKey = json.Get(transaction.String(), "PublicKey").String()
paymentChannelClaim.Signature = json.Get(transaction.String(), "Signature").String()
b.PaymentChannelClaims = append(b.PaymentChannelClaims, paymentChannelClaim)
log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelClaim.Hash)
case PaymentChannelCreate:
paymentChannelCreate := new(model.PaymentChannelCreate)
parseTransactionCommonFields(&paymentChannelCreate.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
paymentChannelCreate.Amount = json.Get(transaction.String(), "Amount").String()
paymentChannelCreate.Destination = json.Get(transaction.String(), "Destination").String()
paymentChannelCreate.DestinationTag = json.Get(transaction.String(), "DestinationTag").Int()
paymentChannelCreate.SettleDelay = json.Get(transaction.String(), "SettleDelay").Int()
paymentChannelCreate.PublicKey = json.Get(transaction.String(), "PublicKey").String()
paymentChannelCreate.CancelAfter = json.Get(transaction.String(), "CancelAfter").Int()
b.PaymentChannelCreates = append(b.PaymentChannelCreates, paymentChannelCreate)
log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelCreate.Hash)
case PaymentChannelFund:
paymentChannelFund := new(model.PaymentChannelFund)
parseTransactionCommonFields(&paymentChannelFund.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
paymentChannelFund.Amount = json.Get(transaction.String(), "Amount").String()
paymentChannelFund.Channel = json.Get(transaction.String(), "Channel").String()
paymentChannelFund.Expiration = json.Get(transaction.String(), "Expiration").Int()
b.PaymentChannelFunds = append(b.PaymentChannelFunds, paymentChannelFund)
log.Debug("splitter ripple: %s transaction %s .", transactionType, paymentChannelFund.Hash)
case SetRegularKey:
setRegularKey := new(model.SetRegularKey)
parseTransactionCommonFields(&setRegularKey.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
setRegularKey.RegularKey = json.Get(transaction.String(), "RegularKey").String()
b.SetRegularKeys = append(b.SetRegularKeys, setRegularKey)
log.Debug("splitter ripple: %s transaction %s .", transactionType, setRegularKey.Hash)
case SignerListSet:
signerListSet := new(model.SignerListSet)
parseTransactionCommonFields(&signerListSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
signerListSet.SignerQuorum = json.Get(transaction.String(), "SignerQuorum").Int()
signerListSet.SignerEntries = json.Get(transaction.String(), "SignerEntries").String()
b.SignerListSets = append(b.SignerListSets, signerListSet)
log.Debug("splitter ripple: %s transaction %s .", transactionType, signerListSet.Hash)
case TrustSet:
trustSet := new(model.TrustSet)
parseTransactionCommonFields(&trustSet.TransactionCommonFields, transaction.String(), b.Ledger.LedgerIndex, b.Ledger.Timestamp)
trustSet.LimitAmount = json.Get(transaction.String(), "LimitAmount").String()
trustSet.QualityIn = json.Get(transaction.String(), "QualityIn").Int()
trustSet.QualityOut = json.Get(transaction.String(), "QualityOut").Int()
b.TrustSets = append(b.TrustSets, trustSet)
log.Debug("splitter ripple: %s transaction %s .", transactionType, trustSet.Hash)
}
}
b.Ledger.TransactionLength = len(transactionList)
elaspedTime := time.Now().Sub(startTime)
log.Debug("splitter ripple: parse ledger %d, txs %d, elasped time %s", b.Ledger.LedgerIndex, b.Ledger.TransactionLength, elaspedTime.String())
return b, nil
}
func parseBigInt(s string) (math.HexOrDecimal256, error) {
var n math.HexOrDecimal256
if s == "0x" {
s = "0x0"
}
v, ok := math.ParseBig256(s)
if !ok {
n = math.HexOrDecimal256(*defaultBigNumber)
} else {
if v.Cmp(maxBigNumber) >= 0 {
n = math.HexOrDecimal256(*defaultBigNumber)
} else {
n = math.HexOrDecimal256(*v)
}
}
return n, nil
}
{
}
\ No newline at end of file
package xrp
package ripple
import (
"fmt"
......@@ -8,7 +8,7 @@ import (
"github.com/jdcloud-bds/bds/common/kafka"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/service"
model "github.com/jdcloud-bds/bds/service/model/xrp"
model "github.com/jdcloud-bds/bds/service/model/ripple"
"github.com/xeipuuv/gojsonschema"
"strconv"
"strings"
......@@ -38,7 +38,7 @@ type XRPSplitter struct {
missedBlockList map[int64]bool
latestSaveDataTimestamp time.Time
latestReceiveMessageTimestamp time.Time
databaseWorkerChan chan *XRPBlockData
databaseWorkerChan chan *XRPLedgerData
databaseWorkerStopChan chan bool
}
......@@ -46,7 +46,7 @@ func NewSplitter(cfg *SplitterConfig) (*XRPSplitter, error) {
var err error
s := new(XRPSplitter)
s.cfg = cfg
s.databaseWorkerChan = make(chan *XRPBlockData, cfg.DatabaseWorkerBuffer)
s.databaseWorkerChan = make(chan *XRPLedgerData, cfg.DatabaseWorkerBuffer)
s.databaseWorkerStopChan = make(chan bool, 0)
s.missedBlockList = make(map[int64]bool, 0)
httpClient := httputils.NewRestClientWithBasicAuth(s.cfg.User, s.cfg.Password)
......@@ -61,11 +61,11 @@ func NewSplitter(cfg *SplitterConfig) (*XRPSplitter, error) {
}
s.cronWorker = NewCronWorker(s)
err = s.cronWorker.Prepare()
if err != nil {
log.DetailError(err)
return nil, err
}
//err = s.cronWorker.Prepare()
//if err != nil {
// log.DetailError(err)
// return nil, err
//}
return s, nil
}
......@@ -73,14 +73,14 @@ func NewSplitter(cfg *SplitterConfig) (*XRPSplitter, error) {
func (s *XRPSplitter) Start() {
err := s.cronWorker.Start()
if err != nil {
log.Error("splitter xrp: cron worker start error")
log.Error("splitter ripple: cron worker start error")
log.DetailError(err)
return
}
err = s.cfg.Consumer.Start(s.cfg.Topic)
if err != nil {
log.Error("splitter xrp: consumer start error")
log.Error("splitter ripple: consumer start error")
log.DetailError(err)
return
}
......@@ -89,8 +89,8 @@ func (s *XRPSplitter) Start() {
go s.databaseWorker(i)
}
log.Debug("splitter xrp: consumer start topic %s", s.cfg.Topic)
log.Debug("splitter xrp: database enable is %v", s.cfg.DatabaseEnable)
log.Debug("splitter ripple: consumer start topic %s", s.cfg.Topic)
log.Debug("splitter ripple: database enable is %v", s.cfg.DatabaseEnable)
for {
select {
......@@ -102,23 +102,32 @@ func (s *XRPSplitter) Start() {
if s.cfg.JSONSchemaValidationEnable {
ok, err := s.jsonSchemaValid(string(message.Data))
if err != nil {
log.Error("splitter xrp: json schema valid error")
log.Error("splitter ripple: json schema valid error")
}
if !ok {
log.Warn("splitter xrp: json schema valid failed")
log.Warn("splitter ripple: json schema valid failed")
}
}
data, err := ParseBlock(string(message.Data))
data, err := ParseLedger(string(message.Data))
if err != nil {
stats.Add(MetricParseDataError, 1)
log.Error("splitter xrp: block parse error, retry after 5s")
log.Error("splitter ripple: ledger parse error, retry after 5s")
log.DetailError(err)
time.Sleep(time.Second * 5)
goto START
}
if s.cfg.DatabaseEnable {
exist, err := s.IsExistingLedger(data)
if err != nil {
stats.Add(MetricParseDataError, 1)
log.Error("splitter ripple: check ledger exist error, retry after 2s")
log.DetailError(err)
time.Sleep(time.Second * 2)
goto START
}
if s.cfg.DatabaseEnable && !exist {
s.databaseWorkerChan <- data
s.cfg.Consumer.MarkOffset(message)
}
......@@ -130,47 +139,24 @@ func (s *XRPSplitter) Stop() {
s.cronWorker.Stop()
}
func (s *XRPSplitter) CheckBlock(curBlock *XRPBlockData) (bool, int64) {
func (s *XRPSplitter) IsExistingLedger(cur *XRPLedgerData) (bool, error) {
db := service.NewDatabase(s.cfg.Engine)
height := int64(-1)
prevBlock := make([]*model.Block, 0)
err := db.Where("height = ?", curBlock.Block.LedgerIndex-1).Find(&prevBlock)
ledgers := make([]*model.Ledger, 0)
err := db.Where("ledger_index = ?", cur.Ledger.LedgerIndex).Find(&ledgers)
if err != nil {
log.DetailError(err)
return false, height
return false, err
}
if len(prevBlock) != 1 {
log.Warn("splitter xrp: can not find previous block %d", curBlock.Block.LedgerIndex-1)
blocks := make([]*model.Block, 0)
err = db.Desc("height").Limit(1).Find(&blocks)
if err != nil {
log.DetailError(err)
} else {
if len(blocks) > 0 {
height = blocks[0].LedgerIndex + 1
} else {
log.Warn("splitter xrp: database empty")
height = 0
}
}
return false, height
if len(ledgers) == 0 {
//log.Warn("splitter ripple: can not find current ledger %d", cur.Ledger.LedgerIndex)
return false, nil
}
if prevBlock[0].Hash != curBlock.Block.ParentHash {
log.Warn("splitter xrp: block %d is revert", curBlock.Block.LedgerIndex-1)
err = s.remoteHandler.SendBatchBlock(prevBlock[0].LedgerIndex, curBlock.Block.LedgerIndex)
if err != nil {
log.DetailError(err)
}
height = prevBlock[0].LedgerIndex
return false, height
}
log.Debug("splitter xrp: check block %d pass", curBlock.Block.LedgerIndex)
return true, height
return true, nil
}
func (s *XRPSplitter) SaveBlock(data *XRPBlockData) error {
func (s *XRPSplitter) LedgerInsert(data *XRPLedgerData) error {
startTime := time.Now()
tx := service.NewTransaction(s.cfg.Engine)
defer tx.Close()
......@@ -182,73 +168,173 @@ func (s *XRPSplitter) SaveBlock(data *XRPBlockData) error {
stats.Add(MetricDatabaseRollback, 1)
return err
}
blockTemp := new(model.Block)
blockTemp.LedgerIndex = data.Block.LedgerIndex
has, err := tx.Get(blockTemp)
var affected int64
ledgers := make([]*model.Ledger, 0)
ledgers = append(ledgers, data.Ledger)
affected, err = tx.BatchInsert(ledgers)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
if has {
if blockTemp.Hash == data.Block.Hash || blockTemp.CloseTime < data.Block.CloseTime {
log.Warn("splitter xrp: block %d has been stored", data.Block.LedgerIndex)
log.Debug("splitter ripple: ledger write %d rows", affected)
if len(data.AccountSets) != 0 {
affected, err = tx.BatchInsert(data.AccountSets)
if err != nil {
_ = tx.Rollback()
return nil
} else {
log.Warn("splitter xrp: block %d need to be replaced by the new one", data.Block.LedgerIndex)
s.revertLedger(data.Block.LedgerIndex)
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction AccountSets write %d rows", affected)
}
var affected int64
blocks := make([]*model.Block, 0)
blocks = append(blocks, data.Block)
affected, err = tx.BatchInsert(blocks)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
if len(data.DepositPreauths) != 0 {
affected, err = tx.BatchInsert(data.DepositPreauths)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction DepositPreauths write %d rows", affected)
}
log.Debug("splitter xrp: block write %d rows", affected)
affected, err = tx.BatchInsert(data.Transactions)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
if len(data.EscrowCancels) != 0 {
affected, err = tx.BatchInsert(data.EscrowCancels)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction EscrowCancels write %d rows", affected)
}
log.Debug("splitter xrp: transaction write %d rows", affected)
affected, err = tx.BatchInsert(data.AffectedNodes)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
if len(data.EscrowCreates) != 0 {
affected, err = tx.BatchInsert(data.EscrowCreates)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction EscrowCreates write %d rows", affected)
}
log.Debug("splitter xrp: affected nodes write %d rows", affected)
affected, err = tx.BatchInsert(data.Paths)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
if len(data.EscrowFinishes) != 0 {
affected, err = tx.BatchInsert(data.EscrowFinishes)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction EscrowFinishes write %d rows", affected)
}
log.Debug("splitter xrp: paths write %d rows", affected)
affected, err = tx.BatchInsert(data.Amounts)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
if len(data.OfferCancels) != 0 {
affected, err = tx.BatchInsert(data.OfferCancels)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction OfferCancels write %d rows", affected)
}
if len(data.OfferCreates) != 0 {
affected, err = tx.BatchInsert(data.OfferCreates)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction OfferCreates write %d rows", affected)
}
if len(data.Payments) != 0 {
affected, err = tx.BatchInsert(data.Payments)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction Payments write %d rows", affected)
}
if len(data.PaymentChannelClaims) != 0 {
affected, err = tx.BatchInsert(data.PaymentChannelClaims)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction PaymentChannelClaims write %d rows", affected)
}
if len(data.PaymentChannelCreates) != 0 {
affected, err = tx.BatchInsert(data.PaymentChannelCreates)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction PaymentChannelCreates write %d rows", affected)
}
if len(data.PaymentChannelFunds) != 0 {
affected, err = tx.BatchInsert(data.PaymentChannelFunds)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction PaymentChannelFunds write %d rows", affected)
}
if len(data.SetRegularKeys) != 0 {
affected, err = tx.BatchInsert(data.SetRegularKeys)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction SetRegularKeys write %d rows", affected)
}
if len(data.SignerListSets) != 0 {
affected, err = tx.BatchInsert(data.SignerListSets)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction SignerListSets write %d rows", affected)
}
if len(data.TrustSets) != 0 {
affected, err = tx.BatchInsert(data.TrustSets)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
log.Debug("splitter ripple: transaction TrustSets write %d rows", affected)
}
log.Debug("splitter xrp: amounts write %d rows", affected)
err = tx.Commit()
if err != nil {
_ = tx.Rollback()
......@@ -257,11 +343,12 @@ func (s *XRPSplitter) SaveBlock(data *XRPBlockData) error {
}
tx.Close()
stats.Add(MetricDatabaseCommit, 1)
elaspedTime := time.Now().Sub(startTime)
elapsedTime := time.Now().Sub(startTime)
s.latestSaveDataTimestamp = time.Now()
log.Debug("splitter xrp: block %d write done elasped: %s", data.Block.LedgerIndex, elaspedTime.String())
log.Debug("splitter ripple: ledger data %d write done elapsed: %s", data.Ledger.LedgerIndex, elapsedTime.String())
return nil
}
func (s *XRPSplitter) revertLedger(ledgerIndex int64) error {
tx := service.NewTransaction(s.cfg.Engine)
defer tx.Close()
......@@ -273,23 +360,7 @@ func (s *XRPSplitter) revertLedger(ledgerIndex int64) error {
stats.Add(MetricDatabaseRollback, 1)
return err
}
sql := fmt.Sprintf("delete from xrp_block where ledger_index = %d", ledgerIndex)
_, err = tx.Exec(sql)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
sql = fmt.Sprintf("delete from xrp_transaction where ledger_index = %d", ledgerIndex)
_, err = tx.Exec(sql)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
sql = fmt.Sprintf("delete from xrp_amount where ledger_index = %d", ledgerIndex)
sql := fmt.Sprintf("delete from ripple_ledger where ledger_index = %d", ledgerIndex)
_, err = tx.Exec(sql)
if err != nil {
_ = tx.Rollback()
......@@ -297,15 +368,7 @@ func (s *XRPSplitter) revertLedger(ledgerIndex int64) error {
stats.Add(MetricDatabaseRollback, 1)
return err
}
sql = fmt.Sprintf("delete from xrp_transaction_affected_nodes where ledger_index = %d", ledgerIndex)
_, err = tx.Exec(sql)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
stats.Add(MetricDatabaseRollback, 1)
return err
}
sql = fmt.Sprintf("delete from xrp_path where ledger_index = %d", ledgerIndex)
sql = fmt.Sprintf("delete from ripple_ac where ledger_index = %d", ledgerIndex)
_, err = tx.Exec(sql)
if err != nil {
_ = tx.Rollback()
......@@ -322,35 +385,36 @@ func (s *XRPSplitter) revertLedger(ledgerIndex int64) error {
tx.Close()
return nil
}
func (s *XRPSplitter) CheckMissedBlock() ([]int64, error) {
func (s *XRPSplitter) CheckMissedLedger() ([]int64, error) {
missedList := make([]int64, 0)
db := service.NewDatabase(s.cfg.Engine)
sql := fmt.Sprintf("SELECT height FROM xrp_block ORDER BY height ASC")
sql := fmt.Sprintf("SELECT ledger_index FROM ripple_ledger ORDER BY ledger_index ASC")
data, err := db.QueryString(sql)
if err != nil {
return nil, err
}
blockList := make([]*model.Block, 0)
ledgerList := make([]*model.Ledger, 0)
for _, v := range data {
block := new(model.Block)
tmp := v["height"]
height, err := strconv.ParseInt(tmp, 10, 64)
ledger := new(model.Ledger)
tmp := v["ledger_index"]
ledgerIndex, err := strconv.ParseInt(tmp, 10, 64)
if err != nil {
return nil, err
}
block.LedgerIndex = height
blockList = append(blockList, block)
ledger.LedgerIndex = ledgerIndex
ledgerList = append(ledgerList, ledger)
}
if len(blockList) > 0 {
if len(ledgerList) > 0 {
checkList := make(map[int64]bool, 0)
for _, b := range blockList {
for _, b := range ledgerList {
checkList[b.LedgerIndex] = true
}
for i := int64(0); i <= blockList[len(blockList)-1].LedgerIndex; i++ {
for i := int64(32570); i <= ledgerList[len(ledgerList)-1].LedgerIndex; i++ {
if _, ok := checkList[i]; !ok {
missedList = append(missedList, i)
}
......@@ -365,13 +429,13 @@ func (s *XRPSplitter) jsonSchemaValid(data string) (bool, error) {
dataLoader := gojsonschema.NewStringLoader(data)
result, err := gojsonschema.Validate(s.jsonSchemaLoader, dataLoader)
if err != nil {
log.Error("splitter xrp: json schema validation error")
log.Error("splitter ripple: json schema validation error")
log.DetailError(err)
return false, err
}
if !result.Valid() {
for _, err := range result.Errors() {
log.Error("splitter xrp: data invalid %s", strings.ToLower(err.String()))
log.Error("splitter ripple: data invalid %s", strings.ToLower(err.String()))
return false, nil
}
stats.Add(MetricVaildationError, 1)
......@@ -379,24 +443,24 @@ func (s *XRPSplitter) jsonSchemaValid(data string) (bool, error) {
stats.Add(MetricVaildationSuccess, 1)
}
elaspedTime := time.Now().Sub(startTime)
log.Debug("splitter xrp: json schema validation elasped %s", elaspedTime)
log.Debug("splitter ripple: json schema validation elapsed %s", elaspedTime)
return true, nil
}
func (s *XRPSplitter) databaseWorker(i int) {
log.Info("splitter xrp: starting database worker %d", i)
log.Info("splitter ripple: starting database worker %d", i)
for {
select {
case data := <-s.databaseWorkerChan:
err := s.SaveBlock(data)
err := s.LedgerInsert(data)
if err != nil {
log.Error("splitter xrp: block %d save error, retry after 5s", data.Block.LedgerIndex)
log.Error("splitter ripple: ledger data %d insert error, retry after 5s", data.Ledger.LedgerIndex)
log.DetailError(err)
}
case stop := <-s.databaseWorkerStopChan:
if stop {
msg := fmt.Sprintf("splitter xrp: database worker %d stopped", i)
log.Info("splitter xrp: ", msg)
msg := fmt.Sprintf("splitter ripple: database worker %d stopped", i)
log.Info("splitter ripple: ", msg)
return
}
}
......
package xrp
package ripple
import (
"github.com/jdcloud-bds/bds/common/cron"
......@@ -24,7 +24,7 @@ func (w *CronWorker) Prepare() error {
}
for _, job := range jobList {
log.Debug("worker xrp: prepare %s", job.Name())
log.Debug("worker ripple: prepare %s", job.Name())
err := job.run()
if err != nil {
return err
......@@ -47,18 +47,18 @@ func (w *CronWorker) Start() error {
return err
}
stats.Add(MetricCronWorkerJob, 1)
log.Info("worker xrp: add job %s", job.Name())
log.Info("worker ripple: add job %s", job.Name())
}
expr = config.SplitterConfig.CronXRPSetting.GetBatchBlockExpr
expr = config.SplitterConfig.CronXRPSetting.GetBatchLedgerExpr
if len(expr) != 0 {
job = newGetBatchBlockJob(w.splitter)
job = newGetBatchLedgerJob(w.splitter)
err = w.crontab.AddJob(job.Name(), expr, job)
if err != nil {
return err
}
stats.Add(MetricCronWorkerJob, 1)
log.Info("worker xrp: add job %s", job.Name())
log.Info("worker ripple: add job %s", job.Name())
}
w.crontab.Start()
......
......@@ -18,7 +18,7 @@ import (
"github.com/jdcloud-bds/bds/splitter/ltc"
"github.com/jdcloud-bds/bds/splitter/tron"
"github.com/jdcloud-bds/bds/splitter/xlm"
"github.com/jdcloud-bds/bds/splitter/xrp"
"github.com/jdcloud-bds/bds/splitter/ripple"
)
type Splitter struct {
......@@ -756,7 +756,7 @@ func (p *Splitter) Run() {
//Loading configuration file information of XRP node
if config.SplitterConfig.XRPSetting.Enable {
xrpConfig := &xrp.SplitterConfig{
xrpConfig := &ripple.SplitterConfig{
p.xrpEngine,
p.xrpConsumer,
config.SplitterConfig.KafkaXRPSetting.Topic,
......@@ -770,7 +770,7 @@ func (p *Splitter) Run() {
config.SplitterConfig.XRPSetting.DatabaseWorkerNumber,
config.SplitterConfig.XRPSetting.DatabaseWorkerBuffer,
}
xrpSplitter, err := xrp.NewSplitter(xrpConfig)
xrpSplitter, err := ripple.NewSplitter(xrpConfig)
if err != nil {
panic(err)
}
......
package xrp
import (
"github.com/jdcloud-bds/bds/common/cuckoofilter"
"github.com/jdcloud-bds/bds/common/metric"
model "github.com/jdcloud-bds/bds/service/model/xrp"
"math/big"
"sync"
)
const (
MetricReceiveMessages = "receive_messages"
MetricParseDataError = "parse_data_error"
MetricVaildationSuccess = "validation_success"
MetricVaildationError = "validation_error"
MetricDatabaseRollback = "database_rollback"
MetricDatabaseCommit = "database_commit"
MetricCronWorkerJob = "cron_worker_job"
MetricCronWorkerJobUpdateMetaData = "cron_worker_job_update_meta_data"
MetricCronWorkerJobGetBatchBlock = "cron_worker_job_get_batch_block"
MetricCronWorkerJobRefreshContractAddresses = "cron_worker_job_refresh_contract_addresses"
MetricCronWorkerJobRefreshPoolName = "cron_worker_job_refresh_pool_name"
MetricRPCCall = "rpc_call"
MetricRevertBlock = "revert_block"
AccountTypeNormal = 0
AccountTypeContract = 1
AccountTypeMiner = 2
TransactionTypeNormal = 0
TransactionTypeContract = 1
)
var (
stats = metric.NewMap("xrp")
contractAddressFilter = cuckoofilter.New()
poolNameMap = new(sync.Map)
maxBigNumber, _ = new(big.Int).SetString("100000000000000000000000000000000000000", 10)
defaultBigNumber, _ = new(big.Int).SetString("-1", 10)
)
type XRPBlockData struct {
Block *model.Block
Transactions []*model.Transaction
Accounts []*model.Account
AffectedNodes []*model.AffectedNodes
Paths []*model.Path
Amounts []*model.Amount
}
package xrp
import (
"fmt"
"github.com/jdcloud-bds/bds/common/json"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/common/math"
"github.com/jdcloud-bds/bds/service"
model "github.com/jdcloud-bds/bds/service/model/xrp"
"strings"
"time"
)
func FormatAmount(transaction *model.Transaction, input string) *model.Amount {
amount := new(model.Amount)
amount.LedgerIndex = transaction.LedgerIndex
amount.CloseTime = transaction.CloseTime
amount.ParentHash = transaction.Hash
amount.Currency = json.Get(input, "currency").String()
amount.Issuer = json.Get(input, "issuer").String()
amount.Value = json.Get(input, "value").String()
return amount
}
func ParseBlock(data string) (*XRPBlockData, error) {
startTime := time.Now()
var err error
b := new(XRPBlockData)
b.Block = new(model.Block)
b.Transactions = make([]*model.Transaction, 0)
b.AffectedNodes = make([]*model.AffectedNodes, 0)
b.Paths = make([]*model.Path, 0)
b.Amounts = make([]*model.Amount, 0)
b.Block.Accepted = int(json.Get(data, "accepted").Int())
b.Block.AccountHash = json.Get(data, "account_hash").String()
b.Block.CloseFlags = int(json.Get(data, "close_flags").Int())
b.Block.CloseTime = json.Get(data, "close_time").Int()
b.Block.CloseTimeHuman = json.Get(data, "close_time_human").String()
b.Block.CloseTimeResolution = int(json.Get(data, "close_time_resolution").Int())
b.Block.Closed = int(json.Get(data, "closed").Int())
b.Block.Hash = json.Get(data, "hash").String()
b.Block.LedgerHash = json.Get(data, "ledger_hash").String()
b.Block.LedgerIndex = json.Get(data, "ledger_index").Int()
b.Block.ParentCloseTime = json.Get(data, "parent_close_time").Int()
b.Block.ParentHash = json.Get(data, "parent_hash").String()
b.Block.SeqNum = json.Get(data, "seqNum").Int()
totalCoins := json.Get(data, "total_coins").String()
b.Block.TotalCoins, err = parseBigInt(totalCoins)
if err != nil {
log.Error("splitter xrp: block %d TotalCoins '%s' parse error", b.Block.LedgerIndex, totalCoins)
return nil, err
}
b.Block.TransactionHash = json.Get(data, "transaction_hash").String()
txList := json.Get(data, "transactions").Array()
for _, txItem := range txList {
transaction := new(model.Transaction)
transaction.Account = json.Get(txItem.String(), "Account").String()
transaction.TransactionType = json.Get(txItem.String(), "TransactionType").String()
transaction.Fee = json.Get(txItem.String(), "Fee").Int()
transaction.Sequence = json.Get(txItem.String(), "Sequence").Int()
transaction.AccountTxnID = json.Get(txItem.String(), "AccountTxnID").String()
transaction.Flags = json.Get(txItem.String(), "Flags").Int()
transaction.LastLedgerSequence = json.Get(txItem.String(), "LastLedgerSequence").Int()
transaction.Memos = json.Get(txItem.String(), "Memos").String()
transaction.Signers = json.Get(txItem.String(), "Signers").String()
transaction.SourceTag = json.Get(txItem.String(), "SourceTag").Int()
transaction.SigningPubKey = json.Get(txItem.String(), "SigningPubKey").String()
transaction.TxnSignature = json.Get(txItem.String(), "TxnSignature").String()
transaction.Hash = json.Get(txItem.String(), "hash").String()
transaction.LedgerIndex = b.Block.LedgerIndex
transaction.TransactionResult = json.Get(txItem.String(), "metaData.TransactionResult").String()
transaction.TransactionIndex = int(json.Get(txItem.String(), "metaData.TransactionIndex").Int())
transaction.Validated = int(json.Get(txItem.String(), "validated").Int())
transaction.Date = json.Get(txItem.String(), "date").Int()
transaction.CloseTime = b.Block.CloseTime
tempAmount := json.Get(txItem.String(), "Amount").String()
if strings.Contains(tempAmount, "{") {
amount := FormatAmount(transaction, tempAmount)
amount.AmountType = 1
b.Amounts = append(b.Amounts, amount)
transaction.Amount = -1
} else {
transaction.Amount = json.Get(txItem.String(), "Amount").Int()
}
tempSendMax := json.Get(txItem.String(), "SendMax").String()
if strings.Contains(tempSendMax, "{") {
amount := FormatAmount(transaction, tempSendMax)
amount.AmountType = 2
b.Amounts = append(b.Amounts, amount)
transaction.SendMax = -1
} else {
transaction.SendMax = json.Get(txItem.String(), "SendMax").Int()
}
tempDeliverMin := json.Get(txItem.String(), "DeliverMin").String()
if strings.Contains(tempDeliverMin, "{") {
amount := FormatAmount(transaction, tempDeliverMin)
amount.AmountType = 2
b.Amounts = append(b.Amounts, amount)
transaction.DeliverMin = -1
} else {
transaction.DeliverMin = json.Get(txItem.String(), "DeliverMin").Int()
}
transaction.Destination = json.Get(txItem.String(), "Destination").String()
transaction.DestinationTag = json.Get(txItem.String(), "DestinationTag").Int()
transaction.InvoiceID = json.Get(txItem.String(), "InvoiceID").String()
transaction.Expiration = json.Get(txItem.String(), "Expiration").Int()
transaction.OfferSequence = int(json.Get(txItem.String(), "OfferSequence").Int())
tempTakerGets := json.Get(txItem.String(), "TakerGets").String()
if strings.Contains(tempTakerGets, "{") {
amount := FormatAmount(transaction, tempTakerGets)
amount.AmountType = 4
b.Amounts = append(b.Amounts, amount)
transaction.TakerGets = -1
} else {
transaction.TakerGets = json.Get(txItem.String(), "TakerGets").Int()
}
tempTakerPays := json.Get(txItem.String(), "TakerPays").String()
if strings.Contains(tempTakerPays, "{") {
amount := FormatAmount(transaction, tempTakerPays)
amount.AmountType = 5
b.Amounts = append(b.Amounts, amount)
transaction.TakerPays = -1
} else {
transaction.TakerPays = json.Get(txItem.String(), "TakerPays").Int()
}
tempLimitAmount := json.Get(txItem.String(), "LimitAmount").String()
if strings.Contains(tempLimitAmount, "{") {
amount := FormatAmount(transaction, tempLimitAmount)
amount.AmountType = 6
b.Amounts = append(b.Amounts, amount)
transaction.LimitAmount = -1
} else {
transaction.LimitAmount = json.Get(txItem.String(), "LimitAmount").Int()
}
transaction.QualityIn = json.Get(txItem.String(), "QualityIn").Int()
transaction.QualityOut = json.Get(txItem.String(), "QualityOut").Int()
transaction.ClearFlag = int(json.Get(txItem.String(), "ClearFlag").Int())
transaction.Domain = json.Get(txItem.String(), "Domain").String()
transaction.EmailHash = json.Get(txItem.String(), "EmailHash").String()
transaction.MessageKey = json.Get(txItem.String(), "MessageKey").String()
transaction.SetFlag = int(json.Get(txItem.String(), "SetFlag").Int())
transaction.TransferRate = int(json.Get(txItem.String(), "TransferRate").Int())
transaction.TickSize = int(json.Get(txItem.String(), "TickSize").Int())
transaction.RegularKey = json.Get(txItem.String(), "RegularKey").String()
transaction.SignerQuorum = int(json.Get(txItem.String(), "SignerQuorum").Int())
transaction.CancelAfter = json.Get(txItem.String(), "CancelAfter").Int()
transaction.FinishAfter = json.Get(txItem.String(), "FinishAfter").Int()
transaction.Condition = json.Get(txItem.String(), "Condition").String()
transaction.Owner = json.Get(txItem.String(), "Owner").String()
transaction.Fulfillment = json.Get(txItem.String(), "Fulfillment").String()
transaction.SettleDelay = json.Get(txItem.String(), "SettleDelay").Int()
transaction.PublicKey = json.Get(txItem.String(), "PublicKey").String()
transaction.Channel = json.Get(txItem.String(), "Channel").String()
transaction.Balance = json.Get(txItem.String(), "Balance").Int()
transaction.Authorize = json.Get(txItem.String(), "Authorize").String()
transaction.UnAuthorize = json.Get(txItem.String(), "UnAuthorize").String()
//paths
txPaths := json.Get(txItem.String(), "Paths").Array()
p := int64(0)
for _, pathItem := range txPaths {
inPaths := pathItem.Array()
i := int64(0)
for _, pathAgent := range inPaths {
path := new(model.Path)
path.CloseTime = b.Block.CloseTime
path.LedgerIndex = b.Block.LedgerIndex
path.Type = json.Get(pathAgent.String(), "type").Int()
path.Currency = json.Get(pathAgent.String(), "currency").String()
path.Issuer = json.Get(pathAgent.String(), "issuer").String()
path.ParentHash = transaction.Hash
path.InTxIndex = p
path.InPathIndex = i
b.Paths = append(b.Paths, path)
i++
}
p++
}
//affectedNodes
txNodes := json.Get(txItem.String(), "metaData.AffectedNodes").Array()
for _, nodeItem := range txNodes {
node := new(model.AffectedNodes)
node.CloseTime = b.Block.CloseTime
node.LedgerIndex = b.Block.LedgerIndex
node.ParentHash = transaction.Hash
for key, value := range nodeItem.Map() {
node.NodeType = key
node.LedgerEntryType = json.Get(value.String(), "LedgerEntryType").String()
node.NodeLedgerIndex = json.Get(value.String(), "LedgerIndex").String()
node.PreviousTxnID = json.Get(value.String(), "PreviousTxnID").String()
node.PreviousTxnLgrSeq = json.Get(value.String(), "PreviousTxnLgrSeq").Int()
node.FullJsonStr = value.String()
}
b.AffectedNodes = append(b.AffectedNodes, node)
}
transaction.PathesLen = len(txPaths)
transaction.AffectedNodesLen = len(txNodes)
b.Transactions = append(b.Transactions, transaction)
}
b.Block.TransactionLength = len(b.Transactions)
elaspedTime := time.Now().Sub(startTime)
log.Debug("splitter xrp: parse block %d, txs %d, elasped time %s", b.Block.LedgerIndex, b.Block.TransactionLength, elaspedTime.String())
return b, nil
}
func revertMiner(height int64, tx *service.Transaction) error {
startTime := time.Now()
index := "revert_miner"
sql := fmt.Sprintf("UPDATE a SET a.miner_count = a.miner_count - 1 FROM xrp_account a"+
" JOIN (SELECT miner FROM xrp_block WHERE height = '%d') b"+
" ON a.address = b.miner ", height)
affected1, err := tx.Execute(sql)
if err != nil {
log.DetailError(err)
return err
}
sql = fmt.Sprintf("UPDATE a SET a.miner_uncle_count = a.miner_uncle_count - 1 FROM xrp_account a"+
" JOIN (SELECT miner FROM xrp_block WHERE height = '%d') b"+
" ON a.address = b.miner ", height)
affected2, err := tx.Execute(sql)
if err != nil {
log.DetailError(err)
return err
}
elaspedTime := time.Now().Sub(startTime)
log.Debug("splitter xrp index: %s affected %d %d elasped %s", index, affected1, affected2, elaspedTime.String())
return nil
}
func removeHexPrefixAndToLower(s string) string {
return strings.ToLower(strings.TrimPrefix(s, "0x"))
}
func parseBigInt(s string) (math.HexOrDecimal256, error) {
var n math.HexOrDecimal256
if s == "0x" {
s = "0x0"
}
v, ok := math.ParseBig256(s)
if !ok {
n = math.HexOrDecimal256(*defaultBigNumber)
} else {
if v.Cmp(maxBigNumber) >= 0 {
n = math.HexOrDecimal256(*defaultBigNumber)
} else {
n = math.HexOrDecimal256(*v)
}
}
return n, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册