Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 2 additions & 222 deletions go.sum

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion goent.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@
github.com/bitxhub/crypto-gm v0.0.0-20210825015341-e035b646648d
github.com/bitxhub/offchain-transmission v0.0.0-20220616032739-b2eba23f65b7

replace github.com/ultramesh/crypto-gm => git.hyperchain.cn/dmlab/crypto-gm v0.2.14
replace github.com/bitxhub/crypto-gm => git.hyperchain.cn/bitxhub/bxh-crypto-gm v0.0.0-20210825015341-e035b646648d

replace github.com/bitxhub/offchain-transmission => git.hyperchain.cn/bitxhub/offchain-transmission v0.0.0-20220616032739-b2eba23f65b7

replace github.com/bitxhub/pier-ha => git.hyperchain.cn/bitxhub/pier-ha v0.0.0-20210125070843-56ccae0d88a4

replace github.com/ultramesh/crypto-gm => git.hyperchain.cn/ultramesh/crypto-gm v0.2.14
17 changes: 13 additions & 4 deletions internal/adapt/appchain_adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (

var _ adapt.Adapt = (*AppchainAdapter)(nil)

const (
DirectSrcRegisterErr = "remote service is not registered"
DirectDestAuditErr = "remote service is not allowed to call dest address"
)

type AppchainAdapter struct {
mode string
config *repo.Config
Expand Down Expand Up @@ -193,14 +198,18 @@ func (a *AppchainAdapter) SendIBTP(ibtp *pb.IBTP) error {
}
}

var genFailReceipt bool
if !res.Status {
err := &adapt.SendIbtpError{Err: fmt.Sprintf("fail to send ibtp %s with type %v: %s", ibtp.ID(), ibtp.Type, res.Message)}
if strings.Contains(res.Message, "invalid multi-signature") {
err.Status = adapt.Proof_Invalid
} else if a.config.Mode.Type == repo.DirectMode &&
(strings.Contains(res.Message, "dest address is not in local white list") ||
strings.Contains(res.Message, "remote service is not registered") ||
strings.Contains(res.Message, "remote service is not allowed to call dest address")) {
}
if a.config.Mode.Type == repo.DirectMode &&
(strings.Contains(res.Message, DirectSrcRegisterErr) ||
strings.Contains(res.Message, DirectDestAuditErr)) {
genFailReceipt = true
}
if genFailReceipt {
ibtp.Type = pb.IBTP_RECEIPT_FAILURE
a.ibtpC <- ibtp
err.Status = adapt.Other_Error
Expand Down
44 changes: 27 additions & 17 deletions internal/adapt/bxh_adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
servicemgr "github.com/meshplus/bitxhub-core/service-mgr"
"github.com/meshplus/bitxhub-kit/types"
"github.com/meshplus/bitxhub-model/constant"
"github.com/meshplus/bitxhub-model/pb"
Expand All @@ -29,7 +30,7 @@ var (
ErrMetaOutOfDate = fmt.Errorf("interchain meta is out of date")
)

const maxChSize = 1 << 10
const maxChSize = 1024

// BxhAdapter represents the necessary data for sync tx from bitxhub
type BxhAdapter struct {
Expand Down Expand Up @@ -148,14 +149,13 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
tx.Extra = proof

var (
receipt *pb.Receipt
retErr error
count = uint64(0)
retErr error
count = uint64(0)
)
if err := retry.Retry(func(attempt uint) error {
hash, err := b.client.SendTransaction(tx, nil)
receipt, err := b.client.SendTransactionWithReceipt(tx, nil)
if err != nil {
b.logger.Errorf("Send ibtp error: %s", err.Error())
b.logger.Errorf("Send ibtp with receipt error: %s", err.Error())
if errors.Is(err, rpcx.ErrRecoverable) {
count++
if count == 5 && ibtp.Type == pb.IBTP_INTERCHAIN {
Expand All @@ -168,16 +168,7 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {
tx, _ = b.client.GenerateIBTPTx(ibtp)
return err
}
}
// ensure getting receipt successful
if err := retry.Retry(func(attempt uint) error {
receipt, err = b.client.GetReceipt(hash)
if err != nil {
return fmt.Errorf("get tx receipt by hash %s: %w", hash, err)
}
return nil
}, strategy.Wait(1*time.Second)); err != nil {
b.logger.Errorf("retry error to get receipt: %w", err)
return err
}

// most err occur in bxh's handleIBTP
Expand Down Expand Up @@ -324,8 +315,27 @@ func (b *BxhAdapter) SendIBTP(ibtp *pb.IBTP) error {

func (b *BxhAdapter) GetServiceIDList() ([]string, error) {
if b.mode == repo.RelayMode {
return nil, nil
tx, err := b.client.GenerateContractTx(pb.TransactionData_BVM, constant.ServiceMgrContractAddr.Address(),
"GetServicesByAppchainID", rpcx.String(b.appchainId))
if err != nil {
b.logger.Errorf("GetServiceIDList GenerateContractTx err:%s", err)
panic(err)
}

ret := getTxView(b.client, tx)
service := make([]*servicemgr.Service, 0)
if err = json.Unmarshal(ret, &service); err != nil {
b.logger.Errorf("GetServiceIDList unmarshal err:%s", err)
panic(err)
}
serviceIDList := make([]string, 0)
for _, s := range service {
serviceIDList = append(serviceIDList, s.ServiceID)
}
return serviceIDList, nil
}

// union model
tx, err := b.client.GenerateContractTx(pb.TransactionData_BVM, constant.InterchainContractAddr.Address(),
"GetAllServiceIDs")
if err != nil {
Expand Down
84 changes: 49 additions & 35 deletions internal/adapt/bxh_adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,30 +208,30 @@ func TestSendIBTP(t *testing.T) {
client.EXPECT().GenerateIBTPTx(gomock.Any()).Return(tx, nil).AnyTimes()

networkDownTime := 0
client.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).DoAndReturn(
func(tx *pb.BxhTransaction, opts *rpcx.TransactOpts) (string, error) {
networkDownTime++
if networkDownTime == 1 {
return "", rpcx.ErrBrokenNetwork
} else if networkDownTime == 2 {
return "", rpcx.ErrReconstruct
}
return hash, nil
}).MaxTimes(4)

receiptNetworkDownTime := 0
r := &pb.Receipt{
Ret: []byte("this is a test"),
Status: pb.Receipt_SUCCESS,
}
receiptNetworkDownTime := 0
client.EXPECT().GetReceipt(hash).DoAndReturn(
func(ha string) (*pb.Receipt, error) {
rf := &pb.Receipt{
Ret: []byte("fail receipt"),
Status: pb.Receipt_FAILED,
}
client.EXPECT().SendTransactionWithReceipt(gomock.Any(), gomock.Any()).DoAndReturn(
func(tx *pb.BxhTransaction, opts *rpcx.TransactOpts) (*pb.Receipt, error) {
networkDownTime++
if networkDownTime == 1 {
return nil, rpcx.ErrBrokenNetwork
} else if networkDownTime == 2 {
return nil, rpcx.ErrReconstruct
}
receiptNetworkDownTime++
if receiptNetworkDownTime == 1 {
return nil, fmt.Errorf("not found receipt")
return rf, fmt.Errorf("not found receipt")
}
return r, nil
}).MaxTimes(2)
}).MaxTimes(4)

require.Nil(t, adapter.SendIBTP(&pb.IBTP{}))

receipt := &pb.Receipt{
Expand All @@ -246,7 +246,6 @@ func TestSendIBTP(t *testing.T) {
//require.Nil(t, err)

// test for receipt failed situation
client.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return(hash, nil).AnyTimes()
errMsg1 := fmt.Sprintf("%s: appchain not registerd", noBindRule)
//errMsg2 := fmt.Sprintf("%s: current appchain err", CurAppchainNotAvailable)
errMsg3 := fmt.Sprintf("%s: source service err", SrcServiceNotAvailable)
Expand All @@ -265,7 +264,7 @@ func TestSendIBTP(t *testing.T) {
Ret: []byte(errMsg1),
Status: pb.Receipt_FAILED,
}
client.EXPECT().GetReceipt(hash).Return(failReceipt, nil).AnyTimes()
client.EXPECT().SendTransactionWithReceipt(gomock.Any(), gomock.Any()).Return(failReceipt, nil).AnyTimes()
err := adapter.SendIBTP(&pb.IBTP{})
ibtpErr1, ok := err.(*adapt.SendIbtpError)
require.Equal(t, true, ok)
Expand Down Expand Up @@ -421,25 +420,40 @@ func TestGetServiceIDList(t *testing.T) {
Ret: data1,
Status: pb.Receipt_SUCCESS,
}

client.EXPECT().GenerateContractTx(gomock.Any(), gomock.Any(),
gomock.Any()).Return(&pb.BxhTransaction{}, nil).MaxTimes(2)
client.EXPECT().SendView(gomock.Any()).Return(reciept1, nil).MaxTimes(1)
gomock.Any(), gomock.Any()).Return(&pb.BxhTransaction{}, nil).Times(1)
client.EXPECT().SendView(gomock.Any()).Return(reciept1, nil).Times(1)

// get registered bxhIds in Union Mode
bxhIds := make([]string, 0)
bxhIds = append(bxhIds, strconv.Itoa(bxhId1), strconv.Itoa(bxhId2))
data2, err := json.Marshal(bxhIds)
loggers.InitializeLogger(repo.DefaultConfig())
ids1, err := relayAdapter.GetServiceIDList()
require.Nil(t, err)
require.Equal(t, 2, len(ids1))
require.Equal(t, serviceId1, ids1[0])
require.Equal(t, serviceId2, ids1[1])

client.EXPECT().GenerateContractTx(gomock.Any(), gomock.Any(),
gomock.Any()).Return(&pb.BxhTransaction{}, nil).Times(2)

services2 := make([]string, 0)
services2 = append(services2, fmt.Sprintf("%s:%s:%s", strconv.Itoa(bxhId1), chainId, serviceId1),
fmt.Sprintf("%s:%s:%s", strconv.Itoa(bxhId2), chainId, serviceId2))
data2, err := json.Marshal(services2)
reciept2 := &pb.Receipt{
Ret: data2,
Status: pb.Receipt_SUCCESS,
}
client.EXPECT().SendView(gomock.Any()).Return(reciept2, nil).MaxTimes(1)

loggers.InitializeLogger(repo.DefaultConfig())
ids1, err := relayAdapter.GetServiceIDList()
require.Nil(t, err)
require.Equal(t, 0, len(ids1))
client.EXPECT().SendView(gomock.Any()).Return(reciept2, nil).Times(1)

// get registered bxhIds in Union Mode
bxhIds := make([]string, 0)
bxhIds = append(bxhIds, strconv.Itoa(bxhId1), strconv.Itoa(bxhId2))
data3, err := json.Marshal(bxhIds)
reciept3 := &pb.Receipt{
Ret: data3,
Status: pb.Receipt_SUCCESS,
}
client.EXPECT().SendView(gomock.Any()).Return(reciept3, nil).Times(1)
ids2, err := unionAdapter.GetServiceIDList()
require.Nil(t, err)
require.Equal(t, 1, len(ids2))
Expand Down Expand Up @@ -636,15 +650,15 @@ func getIBTP(t *testing.T, index uint64, typ pb.IBTP_Type) *pb.IBTP {
}
}

func getServiceID(t *testing.T) []string {
services := make([]string, 0)
func getServiceID(t *testing.T) []*service_mgr.Service {
services := make([]*service_mgr.Service, 0)
service1 := &service_mgr.Service{
ServiceID: fmt.Sprintf("%s:%s:%s", strconv.Itoa(bxhId1), chainId, serviceId1),
ServiceID: serviceId1,
}
service2 := &service_mgr.Service{
ServiceID: fmt.Sprintf("%s:%s:%s", strconv.Itoa(bxhId2), chainId, serviceId2),
ServiceID: serviceId2,
}
services = append(services, service1.ServiceID, service2.ServiceID)
services = append(services, service1, service2)
require.Equal(t, 2, len(services))

return services
Expand Down
46 changes: 46 additions & 0 deletions internal/exchanger/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ func New(typ, srcChainId, srcBxhId string, opts ...Option) (*Exchanger, error) {
return exchanger, nil
}

func ParseFullServiceID(id string) (string, string, string, error) {
splits := strings.Split(id, ":")
if len(splits) != 3 {
return "", "", "", fmt.Errorf("invalid full service ID: %s", id)
}
return splits[0], splits[1], splits[2], nil
}

func (ex *Exchanger) checkService(appServiceList, bxhServiceList []string) error {
appServiceM := make(map[string]struct{}, len(appServiceList))
for _, fullServiceID := range appServiceList {
_, _, s, err := ParseFullServiceID(fullServiceID)
if err != nil {
ex.logger.Errorf("ParseFullServiceID err:%s", err)
return err
}
appServiceM[s] = struct{}{}
}
for _, serviceId := range bxhServiceList {
if _, ok := appServiceM[serviceId]; !ok {
return fmt.Errorf("service:[%s] has been registered in bitxhub, "+
"but not registered in broker contract", serviceId)
}
}
return nil
}

func (ex *Exchanger) Start() error {
// init meta info
var (
Expand Down Expand Up @@ -107,6 +134,25 @@ func (ex *Exchanger) Start() error {
}
}

if repo.RelayMode == ex.mode {
bxhServiceList := make([]string, 0)
if err = retry.Retry(func(attempt uint) error {
bxhServiceList, err = ex.destAdapt.GetServiceIDList()
if err != nil {
ex.logger.Errorf("bxhAdapter GetServiceIDList err:%s", err)
return err
}
return nil
}, strategy.Wait(2*time.Second)); err != nil {
return err
}

err = ex.checkService(serviceList, bxhServiceList)
if err != nil {
panic(err)
}
}

if repo.UnionMode == ex.mode {
ex.recoverUnion(ex.srcServiceMeta, ex.destServiceMeta)
// add self_interchains to srcServiceMeta
Expand Down
2 changes: 2 additions & 0 deletions internal/exchanger/exchanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func testNormalStartRelay(t *testing.T) {
ReceiptCounter: map[string]uint64{fullToService: 1},
SourceReceiptCounter: make(map[string]uint64),
InterchainCounter: make(map[string]uint64)}, nil).AnyTimes()

mockAdaptRelay.EXPECT().GetServiceIDList().Return([]string{to}, nil).AnyTimes()
//mockAdaptRelay.EXPECT().QueryInterchain(gomock.Eq("1356:fabric:data")).
// Return(&pb.Interchain{ID: "1356:fabric:transfer"}, nil).AnyTimes()

Expand Down
37 changes: 37 additions & 0 deletions internal/repo/packrd/packed-packr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions internal/repo/repo-packr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.