Vert.x是作为一个事件总线的设计,以保证应用中不同部分以一种非堵塞的线程安全方式通讯,其原理来自于Erlang和Akka,它是能充分利用多核处理器性能并实现高并发编程的需求。
所有Vert.x 的VERTICLE缺省是一个单线程,不像Node.js只有一个单线程,vert.x能在很多线程中运行很多VERTICLE,每个线程一个VERTICLE,这样你可以指定几个VERTICLE作为"worker",能够以多线程方式运行你的任务。
Vert.x可以通过使用Hazelcast实现底层的事件总线的多节点集群。
下面以一个案例说明如何基于 Spring Boot, Spring Data JPA, 和 Spring REST开发一个聊天室系统?
下面没有引入Vert.x而是使用传统Spting语法实现的系统:
@SpringBootApplication
@EnableJpaRepositories
@EnableTransactionManagement
@Slf4j
public class Application {
**public** **static** **void** main(String\[\] args) {
ApplicationContext ctx = SpringApplication.run(Application.**class**, args);
System.out.println("Let's inspect the beans provided by Spring Boot:");
String\[\] beanNames = ctx.getBeanDefinitionNames();
Arrays.sort(beanNames);
**for** (String beanName : beanNames) {
System.out.println(beanName);
}
}
@Bean
**public** DataSource dataSource() {
EmbeddedDatabaseBuilder builder = **new** EmbeddedDatabaseBuilder();
**return** builder.setType(EmbeddedDatabaseType.HSQL).build();
}
@Bean
**public** EntityManagerFactory entityManagerFactory() {
HibernateJpaVendorAdapter vendorAdapter = **new** HibernateJpaVendorAdapter();
vendorAdapter.setGenerateDdl(**true**);
LocalContainerEntityManagerFactoryBean factory = **new** LocalContainerEntityManagerFactoryBean();
factory.setJpaVendorAdapter(vendorAdapter);
factory.setPackagesToScan("com.zanclus.data.entities");
factory.setDataSource(dataSource());
factory.afterPropertiesSet();
**return** factory.getObject();
}
@Bean
**public** PlatformTransactionManager transactionManager(**final** EntityManagerFactory emf) {
**final** JpaTransactionManager txManager = **new** JpaTransactionManager();
txManager.setEntityManagerFactory(emf);
**return** txManager;
}
}
<p>
上述代码中 @Bean是提供了访问JPA的EntityManager, TransactionManager, 和 DataSource. 这是一个标准的Springboot案例源码。前端使用CustomerEnpoints作为RESTful的控制器,接受客户端的请求。
CustomerVerticle 类是作为@Component,意味着在启动时,Spring会初始化这个类,它也有@PostConstruct标注的start方法,这样Verticle 在启动被加载时会执行其方法内容:
@PostConstruct
**public** **void** start() throws Exception {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.get("/v1/customer/:id")
.produces("application/json")
.blockingHandler(**this**::getCustomerById);
router.put("/v1/customer")
.consumes("application/json")
.produces("application/json")
.blockingHandler(**this**::addCustomer);
router.get("/v1/customer")
.produces("application/json")
.blockingHandler(**this**::getAllCustomers);
vertx.createHttpServer().requestHandler(router::accept).listen(8080);
}
<p>
在这个启动方法中,引入了vertx-web库包:Router,能够让用户定义将请求过滤转换为HTTP URLs, methods, 和头部header 等信息,BodyHandler 是能将POST/PUT提交的内容转变为一个JSON对象,Vert.x能够将其作为RoutingContext的一部分进行处理;RoutingContext包含Vert.x请求对象和响应对象和任何Http请求中数据,包括POST内容数据等等,blockingHandler是接受RoutingContext作为输入参数。
注意到blockingHandler方法使用了Java 8的方法引用,如this::getCustomerById,这比使用lambda将逻辑插入blockingHandler 中更具有代码可读性。getCustomerById的方法如下:
private void getCustomerById(RoutingContext rc) {
log.info("Request for single customer");
Long id = Long.parseLong(rc.request().getParam("id"));
**try** {
Customer customer = dao.findOne(id);
**if** (customer==**null**) {
rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");
} **else** {
rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));
}
} **catch** (JsonProcessingException jpe) {
rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");
log.error("Server error", jpe);
}
}
<p>
在getCustomerById方法中实现真正的业务处理,比如调用DAO访问数据库,序列化和反序列化对象,类似在MVC的控制器中实现一样。
完整见Github
这段代码的主要问题是性能的扩展性有限,这段代码要么可以在Tomcat中运行,要么以微服务方式在嵌入式服务器如Jetty或undertow中运行,总之,只能是一个请求捆绑一个线程,当在等待I/O等堵塞操作时,所有的资源都会消耗在等待上,这种局部堵塞引发整体等待是一种资源浪费。
而如果我们引入了Vert.x,使用@Bean注入了Vertx实例:
public class Application {
**public** **static** **void** main(String\[\] args) {
SpringApplication.run(Application.**class**, args);
}
**private** Vertx vertx;
_/\*\*
\* Create an {@link ObjectMapper} for use in (de)serializing objects to/from JSON
\* @return An instance of {@link ObjectMapper}
*/_ @Bean
**public** ObjectMapper objectMapper() {
**return** **new** ObjectMapper(**new** JsonFactory());
} _/\*\*
\* A singleton instance of {@link Vertx} which is used throughout the application
\* @return An instance of {@link Vertx}
*/_ @Bean
**public** Vertx getVertxInstance() {
**if** (**this**.vertx==**null**) {
**this**.vertx = Vertx.vertx();
}
**return** **this**.vertx;
}
....
<p>
访问JPA的代码基本没有变,主要是RESTful控制器更换了,使用CustomerVerticle替代了CustomerEnpoints。
完整代码见:Convert-To-Vert.x-Web
你可能觉得这比原来Spring代码更复杂了些,这里只是介绍案例源码,真正实战中可以使用Vert.x的元注解库实现类似JAX-RS的REST端点方式,当然最主要是我们获得了更好的可伸缩扩展性,在这段代码底层下面,Vertx使用了Netty作为异步IO操作,这样就可以处理更多并发请求,当然限制于数据库连接池的大小。
上面展示了将前端RESTful的IO从传统的同步模式如Jetty转换到异步IO入Netty方式,我们也可以将访问后端数据库的同步IO放入 Worker Verticles,这样就会更有效率地处理来自客户端的请求,代码可见:Convert-To-Worker-Verticles
原文参考:
Reactive Development Using Vert.x - Java Advent
原网址: 访问
创建于: 2021-01-21 15:09:21
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论