引入依赖
Spring Integration框架
Spring Integration框架的Tcp/Udp支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
配置文件
Spring Integration框架对于UDP的配置,可参考官方文档UDP Adapters :: Spring Integration
服务端(Spring Integration)
@Configuration
public class UdpServer {
private static final Logger logger = LoggerFactory.getLogger(UdpServer.class);
@Value("${udp.port}")
private Integer udpPort;
/**
* UDP消息接收服务(Java DSL Configuration)
*/
@Bean
public IntegrationFlow integrationFlow() {
logger.info("UDP服务启动成功,端口号为: {}", udpPort);
return IntegrationFlows.from(Udp.inboundAdapter(udpPort)).channel("udpChannel").get();
}
/**
* 转换器
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {
String message = new String(payload);
// 转换为大写
// message = message.toUpperCase();
return message;
}
/**
* 过滤器
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 信息数据过滤
/*if (message.indexOf("-") < 0) {
// 没有-的数据会被过滤
return false;
}*/
return true;
}
/**
* 路由分发处理器
*/
@Router(inputChannel = "udpRouter")
public String router(String message, @Headers Map<String, Object> headers) {
// 获取来源Id
String id = headers.get("id").toString();
// 获取来源IP,可以进行IP过滤
String ip = headers.get("ip_address").toString();
// 获取来源Port
String port = headers.get("ip_port").toString();
// 筛选,走那个处理器
if (false) {
return "udpHandle2";
}
return "udpHandle1";
}
/**
* 最终处理器1
*/
@ServiceActivator(inputChannel = "udpHandle1")
public void udpMessageHandle(String message) throws Exception {
// 可以进行异步处理
businessService.udpHandleMethod(message);
logger.info("UDP1:" + message);
}
/**
* 最终处理器2
*/
@ServiceActivator(inputChannel = "udpHandle2")
public void udpMessageHandle2(String message) throws Exception {
logger.info("UDP2:" + message);
}
}
客户端(传统写法)
@Service
public class UdpSimpleClient {
private final static Logger logger = LoggerFactory.getLogger(UdpSimpleClient.class);
@Value("${udp.port}")
private Integer udpPort;
public void sendMessage(String message) {
logger.info("发送UDP: {}", message);
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", udpPort);
byte[] udpMessage = message.getBytes();
DatagramPacket datagramPacket = null;
try (DatagramSocket datagramSocket = new DatagramSocket()) {
datagramPacket = new DatagramPacket(udpMessage, udpMessage.length, inetSocketAddress);
datagramSocket.send(datagramPacket);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
logger.info("发送成功");
}
}
客户端(Spring Integration)
@Configuration
public class UdpIntegrationClientConfig {
@Value("${udp.port}")
private Integer udpPort;
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler unicastSendingMessageHandler() {
UnicastSendingMessageHandler unicastSendingMessageHandler = new UnicastSendingMessageHandler("localhost", udpPort);
return unicastSendingMessageHandler;
}
}
@Service
public class UdpIntegrationClient {
private final static Logger logger = LoggerFactory.getLogger(UdpIntegrationClient.class);
@Autowired
private UnicastSendingMessageHandler unicastSendingMessageHandler;
public void sendMessage(String message) {
logger.info("发送UDP: {}", message);
unicastSendingMessageHandler.handleMessage(MessageBuilder.withPayload(message).build());
logger.info("发送成功");
}
}
参考文章:
Overview of Spring Integration Framework :: Spring Integration
TCP and UDP Support :: Spring Integration
SpringBoot 开启 UDP 服务发送及接收 UDP 协议信息 - Booom丶 - 博客园 (cnblogs.com)
评论区