同步模块SyncInfo
同步模块SyncInfo
在跨链服务整个架构中作为通过GateWay网关模块拉取目标链区块并供Storage存储模块解析存储到本地数据库的模块,除了集成到Storage中使用,也可以作为单独模块使用,但是不管如何使用,一定需要配合GateWay模块,启动同步服务需要3个入参,目标链的adapterInfo、syncInfo(高度和hash信息)和gateWay的host,启动同步服务后,即可向gateWayHost对应的GateWay网关同步拉取指定目标链的区块
-
AdapterInfo:设置链Hash和链类型(链Hash即适配器ID,方便GateWay端根据该ID和链类型来选用对应的配置文件来构造相应的适配器),链类型如RepChain、FiscoBcos-2.x、Fabric-2.x
-
SyncInfo:同步指定高度的区块区块的上一个区块的高度height和区块Hash,会从该height和hash的下一个区块开始同步,一直同步到最新高度
注意
比如想要同步高度5及之后的区块,那么syncInfo中localHeight设置为上一个区块的高度即4,locBlkHash设置为高度为5的区块的preHash及高度为4的区块的blkHash
-
gateWayHost:网关的地址,
ip:port
项目代码文件简要介绍
└── rep
└── cross
├── protos
│ └── rep
│ └── Peer.java protobuf生成类,定义了RepChain所使用的区块交易等数据结构
└── sync
├── SyncChainService.java 同步服务的主类,调用ChainInfoClient来通过网关拉取目标链区块
├── client
│ ├── ChainHttpClient.java 封装了发送http请求的工具类
│ └── ChainInfoClient.java 用来对接GateWay,查询ChainInfo,拉取指定高度区块
├── exception
│ └── SyncBlockException.java 同步区块异常,如果同步程序捕获到该异常,同步程序会一直重复尝试拉取该高度区块
└── model
├── ChainInfo.java
└── SyncInfo.java
构建需要准备的环境及工具
1. Java 1.8 (必须)
建议使用
zulu-jdk
(oracle-jdk也可),从官网直接下载,也可使用IDEA下载(File->Project Structure->SDKS,然后选择"+",选择Download JDK),选择Azul-zulu-community,jdk-8。
如果是直接下载,需要配置环境变量
如果是在ide中,下载后,指定相应版本jdk即可
2. Maven项目构建管理工具 (必须)
Maven 是一个项目管理工具,可以对 Java 项目进行构建、依赖管理,是一个自动化构建工具。
自动化构建工具:将原材料(java、js、css、html....)->产品(可发布项目)
编译-打包-部署-测试 -> 自动构建可从官网下载并配置相关信息。
注意
- 需要将Maven的配置到系统的环境变量中,以方便在终端中使用maven命令。
- 需要将同一个Maven地址配置到使用的编译器中,防止项目构建时出现其他问题。
在环境工具准备完成之后,在项目根目录下,打开终端,执行如下命令(或者是用IDEA中对应的maven插件执行相应命令),将模块打包为jar包并安装到本地maven仓库(专业术语),之后别的依赖SyncInfo的跨链模块就可以引用该jar包了。
其他跨链模块即可通过如下方式引用:
// maven
<dependency>
<groupId>net.repchain</groupId>
<artifactId>repchain-cross-syncinfo</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
使用方式
注意
同步不同类型的区块链区块时,需要注意有一些细微差别如下:
- RepChain区块高度从1开始,并且创世快没有preHash,因此从创世块开始同步的话,SyncInfo如下:
- Fabric区块高度从0开始,并且创世块没有preHash,因此从创始块开始同步的话,SyncInfo如下:
- FiscoBcos区块高度从0开始,并且创始块有preHash,因此从创世块开始同步的话,SyncInfo如下:
如下的测试例中,首先为SyncChainService设置了三个参数,即adapterInfo、syncInfo和gateWayHost,启动同步服务后,即可通过回调方法onSuccess获取到在dataType中定义的Block数据结构的区块,可以通过代码查看Block都具有哪些属性,接着即可针对性的做处理了,如调用Storage模块来解析和存储区块和交易到数据库
package rep.cross.storage;
import com.rcjava.protos.Peer;
import org.fisco.bcos.web3j.protocol.core.methods.response.BcosBlock;
import org.junit.jupiter.api.*;
import rep.cross.adapter.AdapterInfo;
import rep.cross.adapter.AdapterType;
import rep.cross.model.Block;
import rep.cross.storage.impl.BlockStorageStub;
import rep.cross.storage.impl.fisco.FiscoBlockStorage;
import rep.cross.storage.impl.rep.RepBlockStorage;
import rep.cross.sync.SyncChainService;
import rep.cross.sync.exception.SyncBlockException;
import rep.cross.sync.model.SyncInfo;
/**
* @author zyf
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SyncServiceTest extends BaseTest {
// repchain
SyncInfo repSyncInfo = SyncInfo.builder().setLocalHeight(0).setLocBlkHash("").build();
AdapterInfo repAdapterInfo = AdapterInfo.builder().setId("79e5d00cc30f40d3a857ec4511c4fa4e").setType(AdapterType.REPCHAIN).build();
// fisco-bcos
SyncInfo fiscoSyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("0x0000000000000000000000000000000000000000000000000000000000000000").build();
AdapterInfo fiscoAdapterInfo = AdapterInfo.builder().setId("74e8089786e54f859112d20666719ac6").setType(AdapterType.FISCO_BCOS_2).build();
// fabric2
SyncInfo fabric2SyncInfo = SyncInfo.builder().setLocalHeight(-1).setLocBlkHash("").build();
AdapterInfo fabric2AdapterInfo = AdapterInfo.builder().setId("74e8089786e54f859112d20666719ac7").setType(AdapterType.FABRIC_2).build();
BlockStorageStub blockStorageStub = new BlockStorageStub(dataSource);
@Test
@DisplayName("测试repchain同步")
void testRepSyncService() throws InterruptedException {
// 初始化同步程序
SyncChainService repSyncChainService = new SyncChainService(repAdapterInfo, repSyncInfo, gateWayHost);
RepBlockStorage repBlockStorage = new RepBlockStorage(dataSource);
logger.info("准备同步...");
repSyncChainService.start(new SyncChainService.CallBack() {
@Override
public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
// repchain
Peer.Block repBlock = objectMapper.convertValue(block.getRawBlock(), Peer.Block.class);
try {
// repchain
System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(repBlock));
// 存储解析
blockStorageStub.saveBlock(info, block, gateWayHost);
} catch (Exception e) {
throw new SyncBlockException(e.getMessage());
}
System.out.println(block.getHash());
}
@Override
public void onError(SyncBlockException syncBlockException) {
}
});
Thread.currentThread().join();
}
@Test
@DisplayName("测试fisco同步")
void testFiscoSyncService() throws InterruptedException {
SyncChainService fiscoSyncChainService = new SyncChainService(fiscoAdapterInfo, fiscoSyncInfo, gateWayHost);
FiscoBlockStorage fiscoBlockStorage = new FiscoBlockStorage(dataSource);
fiscoSyncChainService.start(new SyncChainService.CallBack() {
@Override
public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
// fisco-bcos
BcosBlock.Block bcosBlock = objectMapper.convertValue(block.getRawBlock(), BcosBlock.Block.class);
try {
// fisco-bcos
System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(bcosBlock));
// 存储解析
blockStorageStub.saveBlock(info, block, gateWayHost);
} catch (Exception e) {
throw new SyncBlockException(e.getMessage());
}
System.out.println(block.getHash());
}
@Override
public void onError(SyncBlockException syncBlockException) {
}
});
Thread.currentThread().join();
}
@Test
@DisplayName("测试fabric2同步")
void testFabric2SyncService() throws InterruptedException {
SyncChainService fabric2SyncChainService = new SyncChainService(fabric2AdapterInfo, fabric2SyncInfo, gateWayHost);
Fabric2BlockStorage fiscoBlockStorage = new Fabric2BlockStorage(dataSource);
fabric2SyncChainService.start(new SyncChainService.CallBack() {
@SneakyThrows
@Override
public void onSuccess(AdapterInfo info, Block block) throws SyncBlockException {
// fabric2,注意这里BlockInfo的构造
Constructor<BlockInfo> constructor = BlockInfo.class.getDeclaredConstructor(Common.Block.class);
constructor.setAccessible(true);
BlockInfo fabric2block = constructor.newInstance(objectMapper.convertValue(((HashMap)block.getRawBlock()).get("block"), Common.Block.class));
try {
// fabric2
System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(fabric2block));
// 存储解析
blockStorageStub.saveBlock(info, block, gateWayHost);
} catch (Exception e) {
throw new SyncBlockException(e.getMessage());
}
System.out.println(block.getHash());
}
@Override
public void onError(SyncBlockException syncBlockException) {
}
});
Thread.currentThread().join();
}
}