PythonとFuncAnimation

import pyaudio
import numpy as np
import matplotlib.pyplot as plt
import time
import sys
import matplotlib.animation as animation
from scipy.signal import hilbert
from scipy.signal import medfilt
from sklearn.cluster import KMeans

CHUNK    = 16384
FORMAT   = pyaudio.paInt16
CHANNELS = 1
RATE     = 8000
NMEDIAN  = 51
NHIST    = 100

duration_max = 1000

write_count = 0
read_count = 0

count = 0
trig_count = 0
t_up       = 0
t_down     = 0
y3_b4  = 0
up         = []
down       = []

x  = np.arange(CHUNK)
y  = np.zeros(CHUNK)
y1 = np.zeros(CHUNK)
y2 = np.zeros(CHUNK)
y3 = np.zeros(CHUNK)
#xx = np.arange(CHUNK*2)
yy = np.zeros(CHUNK*2)
y_buf  = np.zeros(CHUNK*2)

p = pyaudio.PyAudio()

fig = plt.figure(1, figsize=(8, 12))
fig.subplots_adjust(hspace=0.7, wspace=0.1)

ax1 = fig.add_subplot(9,1,1)
plt.title('input signal')
ax2 = fig.add_subplot(9,1,2)
plt.title('median filtered envelope')
ax3 = fig.add_subplot(9,1,3)
plt.title('binary signal')
ax4 = fig.add_subplot(9,1,4)
plt.title('Dot/Dash histogram')
ax5 = fig.add_subplot(9,1,5)
plt.title('Spaces histogram')

for ax in fig.axes:
    ax.grid(True)

ax1.set_ylim(-6000, 6000)
ax2.set_ylim(-1000, 6000)
ax3.set_ylim(-0.1, 1.1)
line1, = ax1.plot(x, y, color='blue')
line2, = ax2.plot(x, y, color='green', lw=2)
line3, = ax3.plot(x, y, color='orange', lw=2)

def callback(in_data, frame_count, time_info, status):
    global y_buf
    global write_count

    z = np.frombuffer(in_data, dtype="int16")

    for i in range(CHUNK):
        y_buf[i      ] = y_buf[i+CHUNK]
        y_buf[i+CHUNK] = z    [i] 
    write_count += 1

    return (None, pyaudio.paContinue)

stream  = p.open(format=FORMAT, channels=CHANNELS, rate=RATE,
                input=True, frames_per_buffer=CHUNK,
                stream_callback=callback)

stream.start_stream()

def animate_init():
    line1.set_ydata(y)
    line2.set_ydata(y)
    line3.set_ydata(y)
    return line1, line2, line3

def animate(i):
    global read_count, y1, y2
    global count, trig_count, t_up, t_down, y3_b4, up, down

    while write_count > read_count:
        line1.set_ydata(y_buf)
        y4 = medfilt(np.abs(hilbert(y_buf)), NMEDIAN)
        for i in range(CHUNK):
            y1[i] = y_buf[i+CHUNK//2]
            y2[i] = y4   [i+CHUNK//2]
        th  = 0.5 * y2.max()

        for i in range(CHUNK):
            if y2[i] > th:
                y3[i] = 1
            else:
                y3[i] = 0

        for i in range(CHUNK):
            if y3_b4 == 0 and y3[i] == 1:
                trig_count += 1
                t_up = i+read_count*CHUNK
                t_down_duration = t_up - t_down
                if trig_count > 1: # not the first time
                    down.append(t_down_duration)
                    if len(down) == NHIST:
                        ax4.hist(up,   density=False, alpha=1.0, range=(0,duration_max), bins=50, color="coral")
                        ax5.hist(down, density=False, alpha=1.0, range=(0,duration_max), bins=50, color="blue")
                        up = []
                        down = []
            if y3_b4 == 1 and y3[i] == 0 and  trig_count > 0:
                t_down = i+read_count*CHUNK
                t_up_duration = t_down - t_up
                up.append(t_up_duration)
            y3_b4 = y3[i]         

        line1.set_ydata(y1)
        line2.set_ydata(y2)
        line3.set_ydata(y3)
        read_count += 1

    return line1, line2, line3

ani = animation.FuncAnimation(fig, animate, interval=100, init_func=animate_init, blit=True)
plt.show()

print('\njob end')

stream.stop_stream()
stream.close()
p.terminate()

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.