1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| import serial import serial.tools.list_ports import time import struct import matplotlib.pyplot as plt import numpy as np import threading
class JY901(): def __init__(self): self.time_last = 0 self.time_now = 0 self.record_angle = [] self.record_omega = [] self.record_inte = [] self.count_yaw = 0 self.count_omega = 0 self.omega_inte = 0 self.yaw_init = 0 self.yaw = 0 self.omega = 0
def extract_raw_data(self, data): length = 9 ext_omega = None ext_angle = None try: st = data.index(b'\x55') if st >= 0: reserved = data[st+1] if reserved == 82: if st+length+1 < len(data): if self.count_omega==0: self.time_now = self.time_last = time.time() else: self.time_last = self.time_now self.time_now = time.time()
ext_omega = data[st:st+length+2] temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True) self.omega = temp/32768*2000 self.record_omega.append(self.omega) data = data[st+length+2:] self.count_omega += 1
self.omega_inte += self.omega*(self.time_now - self.time_last) self.omega_inte = (self.omega_inte+180)%360 - 180 self.record_inte.append(self.omega_inte)
elif reserved == 83: if st+length+1 < len(data): ext_angle = data[st:st+length+2] temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True) self.yaw = temp/32768*180 if self.count_yaw==0: self.yaw_init = self.yaw self.record_angle.append(self.yaw - self.yaw_init) data = data[st+length+2:] self.count_yaw += 1
elif reserved == 81: if st+length+1 < len(data): ext_data = None data = data[st+length+2:] else: data = data[st+2:] except: return data, ext_omega, ext_angle if self.count_yaw%100 == 0 and self.count_yaw>0: self.save_data() print(len(self.record_omega),' extract omega ',ext_omega,self.omega_inte) print(len(self.record_angle),' extract angle ',ext_angle,self.yaw - self.yaw_init) print('*'*30) return data, ext_omega, ext_angle
def serial_readers(self, port = None, size = 22): if not port: port = get_serial_port() total = bytearray() ser = None while True: try: if not ser: ser = serial.Serial(port, 115200, timeout=60) if not ser.is_open: ser.open() tmp = ser.read(size) total.extend(tmp) total, ext_omega, ext_angle = self.extract_raw_data(total) except Exception as e: print("exception ", e) if ser: ser.close() time.sleep(5)
return self.record_omega, self.record_angle
def save_data(self): np.savez('record.npz',record_omega=self.record_omega, record_angle=self.record_angle)
def plot(self): plt.ion() plt.figure() while True: try: if len(self.record_angle)>0 and len(self.record_inte)>0 : plt.clf() plt.plot(list(range(len(self.record_angle))),self.record_angle,'r') plt.plot(list(range(len(self.record_inte))),self.record_inte,'b') plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200) plt.legend(labels=['angle','integration']) plt.pause(0.1) plt.ioff() except Exception as e: print("exception ", e)
def get_serial_port(): ports = list(serial.tools.list_ports.comports()) for port in ports: print(ports.index(port), port) selected = -1 while selected < 0 or selected >= len(ports): print("please input serial to use [start from 0]:") selected = int(input()) print("selected: ", selected)
port = ports[selected] print("use ", port) port = list(port) return port[0]
if __name__ == '__main__': imu = JY901() thread_rcv = threading.Thread(target=imu.serial_readers) thread_plt = threading.Thread(target=imu.plot) thread_plt.start() thread_rcv.start()
|