侧边栏壁纸
博主头像
呱仔爱学习 博主等级

✨基础不牢,地动山摇✨

  • 累计撰写 3 篇文章
  • 累计创建 4 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

SpringBoot集成UDP消息

呱仔
2024-10-20 / 0 评论 / 0 点赞 / 53 阅读 / 0 字

引入依赖

  1. Spring Integration框架

  2. Spring Integration框架的Tcp/Udp支持

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
 
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ip</artifactId>
</dependency>

配置文件

Spring Integration框架对于UDP的配置,可参考官方文档UDP Adapters :: Spring Integration

服务端(Spring Integration)

@Configuration
public class UdpServer {

    private static final Logger logger = LoggerFactory.getLogger(UdpServer.class);
 
    @Value("${udp.port}")
    private Integer udpPort;

    /**
     * UDP消息接收服务(Java DSL Configuration)
     */
    @Bean
    public IntegrationFlow integrationFlow() {
        logger.info("UDP服务启动成功,端口号为: {}", udpPort);
        return IntegrationFlows.from(Udp.inboundAdapter(udpPort)).channel("udpChannel").get();
    }
 
    /**
     * 转换器
     */
    @Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
    public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
        String message = new String(payload);
        // 转换为大写
        // message = message.toUpperCase();
        
        return message;
    }
 
    /**
     * 过滤器
     */
    @Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
    public boolean filter(String message, @Headers Map<String, Object> headers) {
        // 获取来源Id
        String id = headers.get("id").toString();
        // 获取来源IP,可以进行IP过滤
        String ip = headers.get("ip_address").toString();
        // 获取来源Port
        String port = headers.get("ip_port").toString();
        // 信息数据过滤
        /*if (message.indexOf("-") < 0) {
            // 没有-的数据会被过滤
            return false;
        }*/
        return true;
    }
 
    /**
     * 路由分发处理器
     */
    @Router(inputChannel = "udpRouter")
    public String router(String message, @Headers Map<String, Object> headers) {
        // 获取来源Id
        String id = headers.get("id").toString();
        // 获取来源IP,可以进行IP过滤
        String ip = headers.get("ip_address").toString();
        // 获取来源Port
        String port = headers.get("ip_port").toString();
        // 筛选,走那个处理器
        if (false) {
            return "udpHandle2";
        }
        return "udpHandle1";
    }
 
    /**
     * 最终处理器1
     */
    @ServiceActivator(inputChannel = "udpHandle1")
    public void udpMessageHandle(String message) throws Exception {
        // 可以进行异步处理
        businessService.udpHandleMethod(message);
        logger.info("UDP1:" + message);
    }
 
    /**
     * 最终处理器2
     */
    @ServiceActivator(inputChannel = "udpHandle2")
    public void udpMessageHandle2(String message) throws Exception {
        logger.info("UDP2:" + message);
    }
 
}


客户端(传统写法)

@Service
public class UdpSimpleClient {
    private final static Logger logger = LoggerFactory.getLogger(UdpSimpleClient.class);
 
    @Value("${udp.port}")
    private Integer udpPort;
 
    public void sendMessage(String message) {
        logger.info("发送UDP: {}", message);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", udpPort);
        byte[] udpMessage = message.getBytes();
        DatagramPacket datagramPacket = null;
        try (DatagramSocket datagramSocket = new DatagramSocket()) {
            datagramPacket = new DatagramPacket(udpMessage, udpMessage.length, inetSocketAddress);
            datagramSocket.send(datagramPacket);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("发送成功");
    }
}

客户端(Spring Integration)

@Configuration
public class UdpIntegrationClientConfig {
 
    @Value("${udp.port}")
    private Integer udpPort;
 
    @Bean
    @ServiceActivator(inputChannel = "udpOut")
    public UnicastSendingMessageHandler unicastSendingMessageHandler() {
        UnicastSendingMessageHandler unicastSendingMessageHandler = new UnicastSendingMessageHandler("localhost", udpPort);
        return unicastSendingMessageHandler;
    }
 
}

@Service
public class UdpIntegrationClient {
 
    private final static Logger logger = LoggerFactory.getLogger(UdpIntegrationClient.class);
 
    @Autowired
    private UnicastSendingMessageHandler unicastSendingMessageHandler;
 
    public void sendMessage(String message) {
        logger.info("发送UDP: {}", message);
        unicastSendingMessageHandler.handleMessage(MessageBuilder.withPayload(message).build());
        logger.info("发送成功");
    }
 
}

参考文章:

Overview of Spring Integration Framework :: Spring Integration

TCP and UDP Support :: Spring Integration

SpringBoot 开启 UDP 服务发送及接收 UDP 协议信息 - Booom丶 - 博客园 (cnblogs.com)

0

评论区