←←←←←←←←←←←← 快!点关注
让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?因为Spring AMPQ提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?
让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建SimpleMessageListenerContainer并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:
spring.cloud.stream.bindings.citiesChannel.destination=streamInput  
spring.cloud.stream.bindings.citiesChannel.group=cities  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities
spring.cloud.stream.bindings.personsChannel.destination=streamInput  
spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons  
在类路径上使用RabbitMQ Binder,每个目标都映射到TopicExchange。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。
spring.cloud.stream.bindings.citiesChannel.destination = streamInput   
spring.cloud.stream.bindings.personsChannel.destination = streamInput  
现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。
因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道
# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.  
spring.cloud.stream.bindings.citiesChannel.group=cities 
# Durable subscription, of course.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true 
# AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).  
# Only messages with routingKey = 'cities' will land here.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities 
spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons  
好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。
<destination>.<group>是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:
package com.example.spring.cloud.configuration;
import org.springframework.cloud.stream.annotation.Input;  
import org.springframework.messaging.SubscribableChannel;
/**  
 \* Created by tomask79 on 30.03.17.  
 */  
public interface SinkRabbitAPI {
    String INPUT_CITIES = "citiesChannel";
    String INPUT_PERSONS = "personsChannel";
    @Input(SinkRabbitAPI.INPUT_CITIES)  
    SubscribableChannel citiesChannel();
    @Input(SinkRabbitAPI.INPUT_PERSONS)  
    SubscribableChannel personsChannel();  
}  
Spring Boot启动时加载这个属性
package com.example.spring.cloud;
import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication  
@EnableBinding({SinkRabbitAPI.class})  
public class StreamingApplication {
    public static void main(String\[\] args) {  
        SpringApplication.run(StreamingApplication.class, args);  
    }  
}  
在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:
import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.cloud.stream.annotation.StreamListener;  
import org.springframework.integration.support.MessageBuilder;  
import org.springframework.messaging.MessageChannel;  
import org.springframework.messaging.handler.annotation.SendTo;  
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**  
 \* Created by tomask79 on 30.03.17.  
 */  
@Service  
public class ProcessingAMPQEndpoint {
    @StreamListener(SinkRabbitAPI.INPUT_CITIES)  
    public void processCity(final String city) {  
        System.out.println("Trying to process input city: "+city);  
    }
    @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
    }  
}  
RabbitMQ绑定器和代理配置
Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:
spring:  
  cloud:  
    stream:  
      bindings:  
        ...  
      binders:  
          rabbitbinder:  
            type: rabbit  
            environment:  
              spring:  
                rabbitmq:  
                  host: rabbitmq  
                  port: 5672  
                  username: XXX  
                  password: XXX  
测试消息消费
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)   
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd  
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd  
您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:
spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6  
这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:
 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
        throw new RuntimeException();  
    }  
如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
 Retry Policy Exhausted  
        at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover  
(RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\]  
        at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc         
建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为Java中的AMPQ基础架构编写样板代码。
秃顶程序员的不易,看到这里,点了关注吧!  
点关注,不迷路,持续更新!!!
Original url: Access
Created at: 2019-03-11 11:48:10
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论