1.modbus协议介绍
modbus通讯协议详解
2.常用功能码
功能码
名称
功能
对应的地址类型
01
读线圈状态
读位(读N个bit)—读从机线圈寄存器,位操作
0x
02
读输入离散量
读位(读N个bit)—读离散输入寄存器,位操作
1x
03
读多个寄存器
读整型、字符型、状态字、浮点型(读N个words)—读保持寄存器,字节操作
4X
04
读输入寄存器
读整型、状态字、浮点型(读N个words)—读输入寄存器,字节操作
3x
05
写单个线圈
写位(写一个bit)—写线圈寄存器,位操作
0x
06
写单个保持寄存器
写整型、字符型、状态字、浮点型(写一个word)—写保持寄存器,字节操作
4x
0F
写多个线圈
写位(写n个bit)—强置一串连续逻辑线圈的通断
0x
10
写多个保持寄存器
写整形、字符型、状态字、浮点型(写n个word)—把具体的二进制值装入一串连续的保持寄存器
4x
3.具体示例:
从机1
请求消息:
010300c80004c5f7(十六进制)
解释:ID为01的从机,从201位开始,读取4位数据,也就是读取栋号、楼层、单元、设备ID(从机地址)
响应消息:
01030800280042003A0001A417(十六进制)
解释:ID为01的从机,从201开始读取到的4位数据,也就是栋号40、楼层66、单元58、设备ID(从机地址)1
4.代码实现
通讯框架:
Java实现ModbusTCP通信
注意:因实际项目中使用了tcp透传的网关,上述框架都不适用,所以直接采用netty通讯框架。
引入依赖:
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
applicaion.yaml配置监听端口:
netty:
port: 45945
连接类
package com.dashan.heating.common.tcp;
import com.dashan.heating.common.utils.LogUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
@Component
public class NettyServer {
/**
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/**
* 启动Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(56613))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ServerChannelInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
LogUtil.info("启动 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
LogUtil.info("关闭Netty");
}
}
消息处理类:
package com.dashan.heating.common.tcp;
import com.dashan.heating.common.config.KeyConfig;
import com.dashan.heating.common.redis.utils.RedisUtil;
import com.dashan.heating.common.tcp.util.ConvertCode;
import com.dashan.heating.common.utils.LogUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Resource
private RedisUtil redisUtil;
private static NettyServerHandler nettyServerHandler;
/**
* redis key过期时间,单位是秒
*/
private static final Integer EXPIRE_TIME = 90;
@PostConstruct
private void init() {
nettyServerHandler = this;
}
/**
* 写数据方法
* @param serial 序列号
* @param msg 数据(16进制字符串)
*/
public void write(String serial, String msg) {
// 获取序列号对应的uid
String uid = (String) nettyServerHandler.redisUtil.get(KeyConfig.SERIAL_KEY_VALUE + serial);
if (uid != null) {
try {
Channel channel = ChannelMap.getTimeServerChannel(uid);
writeToClient(msg, channel);
} catch (Exception e) {
e.printStackTrace();
}
} else {
// TODO redis保存的channel已经过期,表示设备长时间未收到心跳,当离线处理
// 删除uid对应的key和序列号对应的key
nettyServerHandler.redisUtil.del(KeyConfig.UID_KEY_VALUE + uid);
nettyServerHandler.redisUtil.del(KeyConfig.SERIAL_KEY_EXPIRE + serial);
nettyServerHandler.redisUtil.del(KeyConfig.SERIAL_KEY_VALUE + serial);
}
}
/**
* 公用回写数据到客户端的方法
*
* @param receiveStr
* @param channel
*/
private void writeToClient(final String receiveStr, Channel channel) {
try {
// netty需要用ByteBuf传输
ByteBuf bufff = Unpooled.buffer();
// 对接需要16进制
bufff.writeBytes(ConvertCode.hexString2Bytes(receiveStr));
channel.writeAndFlush(bufff).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
LogUtil.info("回写成功:" + receiveStr);
} else {
LogUtil.info("回写失败:" + receiveStr);
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println("调用通用writeToClient()异常" + e.getMessage());
}
}
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
LogUtil.info("客户端已连接到服务器:{}", ctx);
ctx.flush();
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
int length = buf.readableBytes();
byte[] bytes = new byte[length];
int i = 0;
while (i < length) {
bytes[i] = buf.readByte();
buf.markReaderIndex();
i++;
}
String str = new String(bytes, StandardCharsets.UTF_8);
if ("Q".equals(str)) {
// 处理心跳
LogUtil.info("服务器收到心跳: {},客户端为:{}", str, ctx);
String uuid = ctx.channel().id().asLongText();
String serial = (String) nettyServerHandler.redisUtil.get(KeyConfig.UID_KEY_VALUE + uuid);
nettyServerHandler.redisUtil.set(KeyConfig.SERIAL_KEY_EXPIRE + serial, uuid, EXPIRE_TIME);
// 回复字符串 A 防止设备自检测导致重连
ByteBuf buff = Unpooled.buffer();
ctx.writeAndFlush(buff.writeBytes("A".getBytes(StandardCharsets.UTF_8)));
} else if (str.indexOf("ZR") == 0) {
// 处理首次连接
String uuid = ctx.channel().id().asLongText();
LogUtil.info("设备首次连接,序列号为: {},uuid为:{},客户端为: {}", str, uuid, ctx);
// 保存通道
ChannelMap.addTimeServerChannel(uuid, ctx.channel());
// 冗余uid和序列号对应数据,方便过期删除
nettyServerHandler.redisUtil.set(KeyConfig.SERIAL_KEY_EXPIRE + str, uuid, EXPIRE_TIME);
nettyServerHandler.redisUtil.set(KeyConfig.SERIAL_KEY_VALUE + str, uuid);
nettyServerHandler.redisUtil.set(KeyConfig.UID_KEY_VALUE + uuid, str);
} else {
// 处理响应消息
String s1 = ConvertCode.bytes2HexString(bytes);
//对接需要16进制
LogUtil.info("服务器收到消息: {},客户端为:{}", s1, ctx);
}
}
/**
* 客户端主动断开服务端的连接会触发
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
LogUtil.info("客户端主动与服务器断开连接:{}", ctx);
String uuid = ctx.channel().id().asLongText();
ChannelMap.removeTimeServerChannel(uuid);
// 获取序列号
String serial = (String) nettyServerHandler.redisUtil.get(KeyConfig.UID_KEY_VALUE + uuid);
nettyServerHandler.redisUtil.del(KeyConfig.SERIAL_KEY_VALUE + serial);
nettyServerHandler.redisUtil.del(KeyConfig.SERIAL_KEY_EXPIRE + serial);
nettyServerHandler.redisUtil.del(KeyConfig.UID_KEY_VALUE + uuid);
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常
cause.printStackTrace();
//关闭Channel
ctx.close();
}
}
初始化配置类
package com.dashan.heating.common.tcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
//添加编解码(因项目中有十六进制传输,又有字符串传输,所以没有用编解码)
// socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
保存通道的类(netty下发消息时需要用到通道,并且每次建立连接都会重新创建,所以需要维护通道集):
package com.dashan.heating.common.tcp;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelMap {
private static Map<String, Channel> channelMap = new ConcurrentHashMap<String, Channel>();
public static void addTimeServerChannel(String id, Channel sc) {
channelMap.put(id, sc);
}
public static Map<String, Channel> getAllChannels() {
return channelMap;
}
public static Channel getTimeServerChannel(String id) {
return channelMap.get(id);
}
public static void removeTimeServerChannel(String id) {
channelMap.remove(id);
System.out.println("size = " + channelMap.size());
}
}
字符转换类:
package com.dashan.heating.common.tcp.util;
public class ConvertCode {
/**
* @param b 字节数组
* @return 16进制字符串
* @throws
* @Title:bytes2HexString
* @Description:字节数组转16进制字符串
*/
public static String bytes2HexString(byte[] b) {
StringBuffer result = new StringBuffer();
String hex;
for (int i = 0; i < b.length; i++) {
hex = Integer.toHexString(b[i] & 0xFF);
if (hex.length() == 1) {
hex = '0' + hex;
}
result.append(hex.toUpperCase());
}
return result.toString();
}
/**
* @param hexString 16进制字符串
* @return 字节数组
* @Title:hexString2Bytes
* @Description:16进制字符串转字节数组
*/
public static byte[] hexString2Bytes(String hexString) {
if (hexString == null || hexString.equals("")) {
return null;
}
hexString = hexString.toUpperCase();
int length = hexString.length() / 2;
char[] hexChars = hexString.toCharArray();
byte[] d = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return d;
}
private static byte charToByte(char c) {
return (byte) "0123456789ABCDEF".indexOf(c);
}
/**
* @param strPart 字符串
* @return 16进制字符串
* @Title:string2HexString
* @Description:字符串转16进制字符串
*/
public static String string2HexString(String strPart) {
StringBuffer hexString = new StringBuffer();
for (int i = 0; i < strPart.length(); i++) {
int ch = strPart.charAt(i);
String strHex = Integer.toHexString(ch);
hexString.append(strHex);
}
return hexString.toString();
}
/**
* @param src 16进制字符串
* @return 字节数组
* @throws
* @Title:hexString2String
* @Description:16进制字符串转字符串
*/
public static String hexString2String(String src) {
String temp = "";
for (int i = 0; i < src.length() / 2; i++) {
//System.out.println(Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue());
temp = temp + (char) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
}
return temp;
}
}
CRC计算类:
package com.dashan.heating.common.tcp.util;
import java.nio.charset.StandardCharsets;
/**
* 基于Modbus CRC16的校验算法工具类
*/
public class Crc16Util {
/**
* 获取源数据和验证码的组合byte数组
*
* @param strings 可变长度的十六进制字符串
* @return
*/
public static byte[] getData(String... strings) {
byte[] data = new byte[]{};
for (int i = 0; i < strings.length; i++) {
int x = Integer.parseInt(strings[i], 16);
byte n = (byte) x;
byte[] buffer = new byte[data.length + 1];
byte[] aa = {n};
System.arraycopy(data, 0, buffer, 0, data.length);
System.arraycopy(aa, 0, buffer, data.length, aa.length);
data = buffer;
}
return getData(data);
}
/**
* 获取源数据和验证码的组合byte数组
*
* @param aa 字节数组
* @return
*/
private static byte[] getData(byte[] aa) {
byte[] bb = getCrc16(aa);
byte[] cc = new byte[aa.length + bb.length];
System.arraycopy(aa, 0, cc, 0, aa.length);
System.arraycopy(bb, 0, cc, aa.length, bb.length);
return cc;
}
/**
* 获取验证码byte数组,基于Modbus CRC16的校验算法
*/
private static byte[] getCrc16(byte[] arr_buff) {
int len = arr_buff.length;
// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。
int crc = 0xFFFF;
int i, j;
for (i = 0; i < len; i++) {
// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器
crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));
for (j = 0; j < 8; j++) {
// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位
if ((crc & 0x0001) > 0) {
// 如果移出位为 1, CRC寄存器与多项式A001进行异或
crc = crc >> 1;
crc = crc ^ 0xA001;
} else
// 如果移出位为 0,再次右移一位
crc = crc >> 1;
}
}
return intToBytes(crc);
}
/**
* 将int转换成byte数组,低位在前,高位在后
* 改变高低位顺序只需调换数组序号
*/
private static byte[] intToBytes(int value) {
byte[] src = new byte[2];
src[1] = (byte) ((value >> 8) & 0xFF);
src[0] = (byte) (value & 0xFF);
return src;
}
/**
* 将字节数组转换成十六进制字符串
*/
public static String byteTo16String(byte[] data) {
StringBuffer buffer = new StringBuffer();
for (byte b : data) {
buffer.append(byteTo16String(b));
}
return buffer.toString();
}
/**
* 将字节转换成十六进制字符串
* int转byte对照表
* [128,255],0,[1,128)
* [-128,-1],0,[1,128)
*/
public static String byteTo16String(byte b) {
StringBuffer buffer = new StringBuffer();
int aa = (int) b;
if (aa < 0) {
buffer.append(Integer.toString(aa + 256, 16) + " ");
} else if (aa == 0) {
buffer.append("00 ");
} else if (aa > 0 && aa <= 15) {
buffer.append("0" + Integer.toString(aa, 16) + " ");
} else if (aa > 15) {
buffer.append(Integer.toString(aa, 16) + " ");
}
return buffer.toString();
}
}
测试接口
@PostMapping("public/test")
public String test() {
// 从机地址
String address = "01";
// 功能码,读取保持寄存器
String code = "03";
// 开始位置 201
String[] start = {"00", "c8"};
// 数据长度,读取4位
String[] length = {"00", "04"};
// 计算CRC,得出完整消息
byte[] data = Crc16Util.getData(address, code, start[0], start[1], length[0], length[1]);
String msg = Crc16Util.byteTo16String(data).replaceAll(" ", "");
nettyServerHandler.write("ZR00000000WTYG39", msg);
return ReturnBody.success();
}
redis过期处理
package com.dashan.heating.common.redis.config;
import com.dashan.heating.common.redis.listener.KeyExpiredListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(this.redisMessageListenerContainer());
}
}
package com.dashan.heating.common.redis.listener;
import java.nio.charset.StandardCharsets;
import com.dashan.heating.common.config.KeyConfig;
import com.dashan.heating.common.redis.utils.RedisUtil;
import com.dashan.heating.common.tcp.ChannelMap;
import com.dashan.heating.common.utils.LogUtil;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import javax.annotation.Resource;
/**
* redis key过期时间监听
*/
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Resource
private RedisUtil redisUtil;
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
LogUtil.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
if (key.startsWith(KeyConfig.SERIAL_KEY_EXPIRE)) {
//TODO netty通道过期,处理对应的设备离线逻辑
String serial = key.replace(KeyConfig.SERIAL_KEY_EXPIRE, "");
// 获取uid
String uid = (String) redisUtil.get(KeyConfig.SERIAL_KEY_VALUE + serial);
// 删除uid相关数据及对应的通道
redisUtil.del(KeyConfig.UID_KEY_VALUE + uid);
redisUtil.del(KeyConfig.SERIAL_KEY_VALUE + serial);
ChannelMap.removeTimeServerChannel(uid);
}
}
}
原网址: 访问
创建于: 2023-08-31 10:22:10
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论