JY901串口数据接收与处理(Python)

最近在用JY901做一些实验,关于JY901网上有很多资料了,也有上位机软件,可以方便的查看输出数据。我想做的是对输出的角速度进行积分,对比积分后的结果与输出的角度,如果数据都比较准确地话,那么他们应该相差不大。

这篇文章里,要完成的事情就是通过串口接收他输出的角速度和角度,然后对角速度进行积分,并实时显示数据结果。下面我首先对各个部分进行分块解释,完整的代码放在最后。

1. 串口通信

python实现串口通信可以用 pySerial 库。我们首先选择串口对应的设备端口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 获取串口设备对应的端口
def get_serial_port():
ports = list(serial.tools.list_ports.comports())
for port in ports:
print(ports.index(port), port)
selected = -1
while selected < 0 or selected >= len(ports):
print("please input serial to use [start from 0]:")
selected = int(input())
print("selected: ", selected)

port = ports[selected]
print("use ", port)
port = list(port)
return port[0]

然后打开串口并持续接收数据,在收到的数据中提取IMU数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def serial_readers(self, port = None, size = 22):
# port : 串口端口
# size : 默认一次读取数据长度(bytes)
if not port:
port = get_serial_port()
total = bytearray()
ser = None
while True:
try:
if not ser:
ser = serial.Serial(port, 115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ser.read(size)
total.extend(tmp)
total, ext_omega, ext_angle = self.extract_raw_data(total) #提取IMU数据
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)

return self.record_omega, self.record_angle

2. IMU数据提取

首先我们来看JY901串口数据的帧结构:

1
2
3
加速度:0x55 0x51 AxL AxH AyL AyH AzL AzH TL TH SUM 
角速度:0x55 0x52 wxL wxH wyL wyH wzL wzH TL TH SUM
角度: 0x55 0x53 RollL RollH PitchL PitchH YawL YawH TL TH SUM

这三种数据帧长度都是 11 Bytes,最后一位为校验位,我们从串口数据中提取IMU数据也是根据这个帧结构,先检测帧头,然后读取整个帧(代码里偷了懒,并没有核对校验位)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def extract_raw_data(self, data):
# 根据报文格式读取角速度数据
# 0x55 0x52 + data + CRC(8bits)
length = 9
ext_omega = None
ext_angle = None
try:
st = data.index(b'\x55') # 寻找报文引导字
if st >= 0:
reserved = data[st+1] # 数据类型
if reserved == 82: # 0x52,提取角速度消息
if st+length+1 < len(data):
if self.count_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()

ext_omega = data[st:st+length+2]
temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
self.omega = temp/32768*2000
self.record_omega.append(self.omega) #deg/s
data = data[st+length+2:]
self.count_omega += 1

self.omega_inte += self.omega*(self.time_now - self.time_last)
self.omega_inte = (self.omega_inte+180)%360 - 180
self.record_inte.append(self.omega_inte)

elif reserved == 83: # 0x53,提取角度消息
if st+length+1 < len(data):
ext_angle = data[st:st+length+2]
temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
if self.count_yaw==0: #记录初始相位
self.yaw_init = self.yaw
self.record_angle.append(self.yaw - self.yaw_init) #deg
data = data[st+length+2:]
self.count_yaw += 1

elif reserved == 81: # 0x51, 丢弃加速度消息
if st+length+1 < len(data):
ext_data = None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
if self.count_yaw%100 == 0 and self.count_yaw>0:
self.save_data()
print(len(self.record_omega),' extract omega ',ext_omega,self.omega_inte)
print(len(self.record_angle),' extract angle ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle

3. 绘图显示

我把历史接收数据都存在一个 list 里面了,每过 0.1s 就重新绘图,看起来就像是在实时更新一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def plot(self):
plt.ion()
plt.figure()
while True:
try:
if len(self.record_angle)>0 and len(self.record_inte)>0 :
plt.clf()
plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)

4. 运行效果

5. 完整代码

为了便于共享数据,定义了一个类 JY901,将上面的各个函数放在这个类里面,输出的数据也都记录在 JY901 的成员变量里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import serial
import serial.tools.list_ports
import time
import struct
import matplotlib.pyplot as plt
import numpy as np
import threading

class JY901():
def __init__(self):
self.time_last = 0
self.time_now = 0
self.record_angle = []
self.record_omega = []
self.record_inte = []
self.count_yaw = 0
self.count_omega = 0
self.omega_inte = 0 #角速度积分
self.yaw_init = 0
self.yaw = 0
self.omega = 0

def extract_raw_data(self, data):
# 根据报文格式读取角速度数据
# 0x55 0x52 + data + CRC(8bits)
length = 9
ext_omega = None
ext_angle = None
try:
st = data.index(b'\x55') # 寻找报文引导字
if st >= 0:
reserved = data[st+1] # 数据类型
if reserved == 82: # 0x52,提取角速度消息
if st+length+1 < len(data):
if self.count_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()

ext_omega = data[st:st+length+2]
temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
self.omega = temp/32768*2000
self.record_omega.append(self.omega) #deg/s
data = data[st+length+2:]
self.count_omega += 1

self.omega_inte += self.omega*(self.time_now - self.time_last)
self.omega_inte = (self.omega_inte+180)%360 - 180
self.record_inte.append(self.omega_inte)

elif reserved == 83: # 0x53,提取角度消息
if st+length+1 < len(data):
ext_angle = data[st:st+length+2]
temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
if self.count_yaw==0: #记录初始相位
self.yaw_init = self.yaw
self.record_angle.append(self.yaw - self.yaw_init) #deg
data = data[st+length+2:]
self.count_yaw += 1

elif reserved == 81: # 0x51, 丢弃加速度消息
if st+length+1 < len(data):
ext_data = None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
if self.count_yaw%100 == 0 and self.count_yaw>0:
self.save_data()
print(len(self.record_omega),' extract omega ',ext_omega,self.omega_inte)
print(len(self.record_angle),' extract angle ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle

def serial_readers(self, port = None, size = 22):
# port : 串口端口
# size : 默认一次读取数据长度(bytes)
if not port:
port = get_serial_port()
total = bytearray()
ser = None
while True:
try:
if not ser:
ser = serial.Serial(port, 115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ser.read(size)
total.extend(tmp)
total, ext_omega, ext_angle = self.extract_raw_data(total)
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)

return self.record_omega, self.record_angle

def save_data(self):
np.savez('record.npz',record_omega=self.record_omega, record_angle=self.record_angle)

def plot(self):
plt.ion()
plt.figure()
while True:
try:
if len(self.record_angle)>0 and len(self.record_inte)>0 :
plt.clf()
plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)


def get_serial_port():
ports = list(serial.tools.list_ports.comports())
for port in ports:
print(ports.index(port), port)
selected = -1
while selected < 0 or selected >= len(ports):
print("please input serial to use [start from 0]:")
selected = int(input())
print("selected: ", selected)

port = ports[selected]
print("use ", port)
port = list(port)
return port[0]


if __name__ == '__main__':
imu = JY901()
thread_rcv = threading.Thread(target=imu.serial_readers)
thread_plt = threading.Thread(target=imu.plot)
thread_plt.start()
thread_rcv.start()

JY901串口数据接收与处理(Python)
https://glooow1024.github.io/2020/07/09/hardware/jy901/
作者
Glooow
发布于
2020年7月9日
许可协议