跳转至

同步模块SyncInfo

同步模块SyncInfo

在跨链服务整个架构中作为通过GateWay网关模块拉取目标链区块并供Storage存储模块解析存储到本地数据库的模块,除了集成到Storage中使用,也可以作为单独模块使用,但是不管如何使用,一定需要配合GateWay模块,启动同步服务需要3个入参,目标链的adapterInfo、syncInfo(高度和hash信息)和gateWay的host,启动同步服务后,即可向gateWayHost对应的GateWay网关同步拉取指定目标链的区块

  • AdapterInfo:设置链Hash和链类型(链Hash即适配器ID,方便GateWay端根据该ID和链类型来选用对应的配置文件来构造相应的适配器),链类型如RepChain、FiscoBcos-2.x、Fabric-2.x

  • SyncInfo:同步指定高度的区块区块的上一个区块的高度height和区块Hash,会从该height和hash的下一个区块开始同步,一直同步到最新高度

    注意

    比如想要同步高度5及之后的区块,那么syncInfo中localHeight设置为上一个区块的高度即4,locBlkHash设置为高度为5的区块的preHash及高度为4的区块的blkHash

  • gateWayHost:网关的地址,ip:port

项目代码文件简要介绍

└── rep
    └── cross
        ├── protos
        │   └── rep
        │       └── Peer.java                   protobuf生成类,定义了RepChain所使用的区块交易等数据结构
        └── sync
            ├── SyncChainService.java           同步服务的主类,调用ChainInfoClient来通过网关拉取目标链区块
            ├── client
            │   ├── ChainHttpClient.java        封装了发送http请求的工具类
            │   └── ChainInfoClient.java        用来对接GateWay,查询ChainInfo,拉取指定高度区块
            ├── exception
            │   └── SyncBlockException.java     同步区块异常,如果同步程序捕获到该异常,同步程序会一直重复尝试拉取该高度区块
            └── model
                ├── ChainInfo.java
                └── SyncInfo.java

构建需要准备的环境及工具

1. Java 1.8 (必须)

建议使用zulu-jdk(oracle-jdk也可),从官网直接下载,也可使用IDEA下载(File->Project Structure->SDKS,然后选择"+",选择Download JDK),选择Azul-zulu-community,jdk-8。

  1. 如果是直接下载,需要配置环境变量

  2. 如果是在ide中,下载后,指定相应版本jdk即可

2. Maven项目构建管理工具 (必须)

Maven 是一个项目管理工具,可以对 Java 项目进行构建、依赖管理,是一个自动化构建工具。

自动化构建工具:将原材料(java、js、css、html....)->产品(可发布项目)

编译-打包-部署-测试 -> 自动构建可从官网下载并配置相关信息。

注意

  • 需要将Maven的配置到系统的环境变量中,以方便在终端中使用maven命令。
  • 需要将同一个Maven地址配置到使用的编译器中,防止项目构建时出现其他问题。

在环境工具准备完成之后,在项目根目录下,打开终端,执行如下命令(或者是用IDEA中对应的maven插件执行相应命令),将模块打包为jar包并安装到本地maven仓库(专业术语),之后别的依赖SyncInfo的跨链模块就可以引用该jar包了。

mvn clear
mvn compile
mvn install

其他跨链模块即可通过如下方式引用:

// maven
<dependency>
    <groupId>net.repchain</groupId>
    <artifactId>repchain-cross-syncinfo</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
// gradle
implementation 'net.repchain:repchain-cross-syncinfo:1.0-SNAPSHOT'
// sbt
libraryDependencies += "net.repchain" % "repchain-cross-syncinfo" % "1.0-SNAPSHOT"

使用方式

注意

同步不同类型的区块链区块时,需要注意有一些细微差别如下:

  • RepChain区块高度从1开始,并且创世快没有preHash,因此从创世块开始同步的话,SyncInfo如下:
    SyncInfo repSyncInfo = SyncInfo.builder().setLocalHeight(0).setLocBlkHash("").build();
    
  • Fabric区块高度从0开始,并且创世块没有preHash,因此从创始块开始同步的话,SyncInfo如下:
    SyncInfo repSyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("").build();
    
  • FiscoBcos区块高度从0开始,并且创始块有preHash,因此从创世块开始同步的话,SyncInfo如下:
    // "0x0000000000000000000000000000000000000000000000000000000000000000"为创世块的preHash
    SyncInfo repSyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("0x0000000000000000000000000000000000000000000000000000000000000000").build();
    

如下的测试例中,首先为SyncChainService设置了三个参数,即adapterInfo、syncInfo和gateWayHost,启动同步服务后,即可通过回调方法onSuccess获取到在dataType中定义的Block数据结构的区块,可以通过代码查看Block都具有哪些属性,接着即可针对性的做处理了,如调用Storage模块来解析和存储区块和交易到数据库

package rep.cross.storage;

import com.rcjava.protos.Peer;
import org.fisco.bcos.web3j.protocol.core.methods.response.BcosBlock;
import org.junit.jupiter.api.*;
import rep.cross.adapter.AdapterInfo;
import rep.cross.adapter.AdapterType;
import rep.cross.model.Block;
import rep.cross.storage.impl.BlockStorageStub;
import rep.cross.storage.impl.fisco.FiscoBlockStorage;
import rep.cross.storage.impl.rep.RepBlockStorage;
import rep.cross.sync.SyncChainService;
import rep.cross.sync.exception.SyncBlockException;
import rep.cross.sync.model.SyncInfo;

/**
 * @author zyf
 */
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SyncServiceTest extends BaseTest {

    // repchain
    SyncInfo repSyncInfo = SyncInfo.builder().setLocalHeight(0).setLocBlkHash("").build();
    AdapterInfo repAdapterInfo = AdapterInfo.builder().setId("79e5d00cc30f40d3a857ec4511c4fa4e").setType(AdapterType.REPCHAIN).build();

    // fisco-bcos
    SyncInfo fiscoSyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("0x0000000000000000000000000000000000000000000000000000000000000000").build();
    AdapterInfo fiscoAdapterInfo = AdapterInfo.builder().setId("74e8089786e54f859112d20666719ac6").setType(AdapterType.FISCO_BCOS_2).build();

  // fabric2
    SyncInfo fabric2SyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("").build();
    AdapterInfo fabric2AdapterInfo = AdapterInfo.builder().setId("74e8089786e54f859112d20666719ac7").setType(AdapterType.FABRIC_2).build();

    BlockStorageStub blockStorageStub = new BlockStorageStub(dataSource);

    @Test
    @DisplayName("测试repchain同步")
    void testRepSyncService() throws InterruptedException {
        // 初始化同步程序
        SyncChainService repSyncChainService = new SyncChainService(repAdapterInfo, repSyncInfo, gateWayHost);
        RepBlockStorage repBlockStorage = new RepBlockStorage(dataSource);
        logger.info("准备同步...");
        repSyncChainService.start(new SyncChainService.CallBack() {
            @Override
            public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
                // repchain
                Peer.Block repBlock = objectMapper.convertValue(block.getRawBlock(), Peer.Block.class);
                try {
                    // repchain
                    System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(repBlock));
                    // 存储解析
                    blockStorageStub.saveBlock(info, block, gateWayHost);
                } catch (Exception e) {
                    throw new SyncBlockException(e.getMessage());
                }
                System.out.println(block.getHash());
            }

            @Override
            public void onError(SyncBlockException syncBlockException) {

            }
        });
        Thread.currentThread().join();
    }

    @Test
    @DisplayName("测试fisco同步")
    void testFiscoSyncService() throws InterruptedException {
        SyncChainService fiscoSyncChainService = new SyncChainService(fiscoAdapterInfo, fiscoSyncInfo, gateWayHost);
        FiscoBlockStorage fiscoBlockStorage = new FiscoBlockStorage(dataSource);
        fiscoSyncChainService.start(new SyncChainService.CallBack() {
            @Override
            public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
                // fisco-bcos
                BcosBlock.Block bcosBlock = objectMapper.convertValue(block.getRawBlock(), BcosBlock.Block.class);
                try {
                    // fisco-bcos
                    System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(bcosBlock));
                    // 存储解析
                    blockStorageStub.saveBlock(info, block, gateWayHost);
                } catch (Exception e) {
                    throw new SyncBlockException(e.getMessage());
                }
                System.out.println(block.getHash());
            }

            @Override
            public void onError(SyncBlockException syncBlockException) {

            }
        });
        Thread.currentThread().join();
    }

    @Test
    @DisplayName("测试fabric2同步")
    void testFabric2SyncService() throws InterruptedException {
        SyncChainService fabric2SyncChainService = new SyncChainService(fabric2AdapterInfo, fabric2SyncInfo, gateWayHost);
        Fabric2BlockStorage fiscoBlockStorage = new Fabric2BlockStorage(dataSource);
        fabric2SyncChainService.start(new SyncChainService.CallBack() {
            @SneakyThrows
            @Override
            public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
                // fabric2,注意这里BlockInfo的构造
                Constructor<BlockInfo> constructor = BlockInfo.class.getDeclaredConstructor(Common.Block.class);
                constructor.setAccessible(true);
                BlockInfo fabric2block = constructor.newInstance(objectMapper.convertValue(((HashMap)block.getRawBlock()).get("block"), Common.Block.class));
                try {
                    // fabric2
                    System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(fabric2block));
                    // 存储解析
                    blockStorageStub.saveBlock(info, block, gateWayHost);
                } catch (Exception e) {
                    throw new SyncBlockException(e.getMessage());
                }
                System.out.println(block.getHash());
            }

            @Override
            public void onError(SyncBlockException syncBlockException) {

            }
        });
        Thread.currentThread().join();
    }
}

回到页面顶部