基于Vert.x和SpringBoot实现响应式开发

Vert.x是作为一个事件总线的设计,以保证应用中不同部分以一种非堵塞的线程安全方式通讯,其原理来自于Erlang和Akka,它是能充分利用多核处理器性能并实现高并发编程的需求。

所有Vert.x 的VERTICLE缺省是一个单线程,不像Node.js只有一个单线程,vert.x能在很多线程中运行很多VERTICLE,每个线程一个VERTICLE,这样你可以指定几个VERTICLE作为"worker",能够以多线程方式运行你的任务。

Vert.x可以通过使用Hazelcast实现底层的事件总线的多节点集群。

下面以一个案例说明如何基于 Spring Boot, Spring Data JPA, 和 Spring REST开发一个聊天室系统?

下面没有引入Vert.x而是使用传统Spting语法实现的系统:

@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {

**public** **static** **void** main(String\[\] args) {
    ApplicationContext ctx = SpringApplication.run(Application.**class**, args);

    System.out.println("Let's inspect the beans provided by Spring Boot:");

    String\[\] beanNames = ctx.getBeanDefinitionNames();
    Arrays.sort(beanNames);
    **for** (String beanName : beanNames) {
        System.out.println(beanName);
    }
}

@Bean
**public** DataSource dataSource() {
    EmbeddedDatabaseBuilder builder = **new** EmbeddedDatabaseBuilder();
    **return** builder.setType(EmbeddedDatabaseType.HSQL).build();
}

@Bean
**public** EntityManagerFactory entityManagerFactory() {
    HibernateJpaVendorAdapter vendorAdapter = **new** HibernateJpaVendorAdapter();
    vendorAdapter.setGenerateDdl(**true**);

    LocalContainerEntityManagerFactoryBean factory = **new** LocalContainerEntityManagerFactoryBean();
    factory.setJpaVendorAdapter(vendorAdapter);
    factory.setPackagesToScan("com.zanclus.data.entities");
    factory.setDataSource(dataSource());
    factory.afterPropertiesSet();

    **return** factory.getObject();
}

@Bean
**public** PlatformTransactionManager transactionManager(**final** EntityManagerFactory emf) {
    **final** JpaTransactionManager txManager = **new** JpaTransactionManager();
    txManager.setEntityManagerFactory(emf);
    **return** txManager;
}

}
<p>

上述代码中 @Bean是提供了访问JPA的EntityManager, TransactionManager, 和 DataSource. 这是一个标准的Springboot案例源码。前端使用CustomerEnpoints作为RESTful的控制器,接受客户端的请求。

CustomerVerticle 类是作为@Component,意味着在启动时,Spring会初始化这个类,它也有@PostConstruct标注的start方法,这样Verticle 在启动被加载时会执行其方法内容:

@PostConstruct

**public** **void** start() throws Exception {
    Router router = Router.router(vertx);
    router.route().handler(BodyHandler.create());
    router.get("/v1/customer/:id")
            .produces("application/json")
            .blockingHandler(**this**::getCustomerById);
    router.put("/v1/customer")
            .consumes("application/json")
            .produces("application/json")
            .blockingHandler(**this**::addCustomer);
    router.get("/v1/customer")
            .produces("application/json")
            .blockingHandler(**this**::getAllCustomers);
    vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}

<p>

在这个启动方法中,引入了vertx-web库包:Router,能够让用户定义将请求过滤转换为HTTP URLs, methods, 和头部header 等信息,BodyHandler 是能将POST/PUT提交的内容转变为一个JSON对象,Vert.x能够将其作为RoutingContext的一部分进行处理;RoutingContext包含Vert.x请求对象和响应对象和任何Http请求中数据,包括POST内容数据等等,blockingHandler是接受RoutingContext作为输入参数。

注意到blockingHandler方法使用了Java 8的方法引用,如this::getCustomerById,这比使用lambda将逻辑插入blockingHandler 中更具有代码可读性。getCustomerById的方法如下:

private void getCustomerById(RoutingContext rc) {

    log.info("Request for single customer");
    Long id = Long.parseLong(rc.request().getParam("id"));
    **try** {
        Customer customer = dao.findOne(id);
        **if** (customer==**null**) {
            rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
        } **else** {
            rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
        }
    } **catch** (JsonProcessingException jpe) {
        rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
        log.error("Server error", jpe);
    }
}

<p>

在getCustomerById方法中实现真正的业务处理,比如调用DAO访问数据库,序列化和反序列化对象,类似在MVC的控制器中实现一样。

完整见Github

这段代码的主要问题是性能的扩展性有限,这段代码要么可以在Tomcat中运行,要么以微服务方式在嵌入式服务器如Jetty或undertow中运行,总之,只能是一个请求捆绑一个线程,当在等待I/O等堵塞操作时,所有的资源都会消耗在等待上,这种局部堵塞引发整体等待是一种资源浪费。

而如果我们引入了Vert.x,使用@Bean注入了Vertx实例:

public class Application {

**public** **static** **void** main(String\[\] args) {
    SpringApplication.run(Application.**class**, args);
}

**private** Vertx vertx;

_/\*\*
 \* Create an {@link ObjectMapper} for use in (de)serializing objects to/from JSON
 \* @return An instance of {@link ObjectMapper}
 */_ @Bean
**public** ObjectMapper objectMapper() {
    **return** **new** ObjectMapper(**new** JsonFactory());
} _/\*\*
 \* A singleton instance of {@link Vertx} which is used throughout the application
 \* @return An instance of {@link Vertx}
 */_ @Bean
**public** Vertx getVertxInstance() {
    **if** (**this**.vertx==**null**) {
        **this**.vertx = Vertx.vertx();
    }
    **return** **this**.vertx;
}

....
<p>

访问JPA的代码基本没有变,主要是RESTful控制器更换了,使用CustomerVerticle替代了CustomerEnpoints。

完整代码见:Convert-To-Vert.x-Web

你可能觉得这比原来Spring代码更复杂了些,这里只是介绍案例源码,真正实战中可以使用Vert.x的元注解库实现类似JAX-RS的REST端点方式,当然最主要是我们获得了更好的可伸缩扩展性,在这段代码底层下面,Vertx使用了Netty作为异步IO操作,这样就可以处理更多并发请求,当然限制于数据库连接池的大小。

上面展示了将前端RESTful的IO从传统的同步模式如Jetty转换到异步IO入Netty方式,我们也可以将访问后端数据库的同步IO放入 Worker Verticles,这样就会更有效率地处理来自客户端的请求,代码可见:Convert-To-Worker-Verticles

原文参考:

Reactive Development Using Vert.x - Java Advent


原网址: 访问
创建于: 2021-01-21 15:09:21
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论