まるっとワーク

データ分析・電子工作など気になることを残していきます

Python:SciPy窓関数の作成

時系列データの周波数解析を実施する際のフーリエ変換で使用する窓関数の処理についてまとめます。

開発環境について

$wmic os get caption
>Microsoft Windows 10 Pro

$wmic os get osarchitecture
>64ビット

$python -V
Python 3.7.6

$pip list -V
scipy 1.4.1


目次


窓関数の役割

測定データはある程度のデータ長を持っており、フーリエ変換するデータを切り出す役割を示します。
また、以下に示すような計測データを周期拡張(コピーして繰り返す)した場合に、本来含まれない波長の波が結果に含まれないようにする役割も示します。

計測データと切り出し区間(青枠)
f:id:toku_dango:20210622193006p:plain

周期拡張した計測データ
f:id:toku_dango:20210622193213p:plain
切り出した区間によっては、上記のように信号がつながらなくなり、本来含まれない波長の波が結果に乗ってきてしまう。

サンプルプログラム

SciPyライブラリには多くの窓関数が用意されているので、指定の窓関数を作成できます。
以下サンプルプログラムは、代表的な窓関数であるハニング窓, ハミング窓, ブラックマン窓を示します。

from scipy import signal
from scipy.fftpack import fft, fftshift
import matplotlib.pyplot as plt
import numpy as np

N = 256            # サンプル数
dt = 0.01          # サンプリング間隔
t = np.arange(0, N * dt, dt)  # 時間軸

# 窓関数の一例
fw1 = signal.hann(N)             # ハニング窓
fw2 = signal.hamming(N)          # ハミング窓
fw3 = signal.blackman(N)         # ブラックマン窓

# 時間信号
fig = plt.figure(figsize=(12.0, 5.0))
plt.plot(t, fw1, label='hann')
plt.plot(t, fw2, label='hamming')
plt.plot(t, fw3, label='blackman')
plt.xlabel("data number", fontsize=12)
plt.ylabel("Amplitude", fontsize=12)
plt.grid()
leg = plt.legend(loc=1, fontsize=15)

#周波数特性
num = 0
window = [fw1, fw2, fw3]
fig = plt.figure(figsize=(12.0, 5.0))
for i in window:
    A = fft(i, 2048) / (len(i)/2.0)
    freq = np.linspace(-0.5, 0.5, len(A))
    response = 20 * np.log10(np.abs(fftshift(A / abs(A).max())))
    if num == 0:
        plt.plot(freq, response, label='hann', linewidth = 0.5)
    elif num == 1:
        plt.plot(freq, response, label='hamming', linewidth = 0.5)  
    else:
        plt.plot(freq, response, label='blackman', linewidth = 0.5)
    num += 1
    
plt.axis([-0.5, 0.5, -120, 0])
plt.title("Frequency response of window")
plt.ylabel("Normalized magnitude [dB]")
plt.xlabel("Normalized frequency [cycles per sample]")
plt.grid()
leg = plt.legend(loc=1, fontsize=15)


実行結果
f:id:toku_dango:20210622200039p:plain
グラフ1つ目が窓関数、
グラフ2つ目が窓関数の周波数特性を示しています。
どの窓関数とも、データ軍の両端は0に近いため、窓関数を掛けたデータの両端が0近くなり、周期拡張した時に信号がつながらなくなるという問題がなくなります。
ちなみにですが、フーリエ変換で使用するScipyモジュール (scipy.signal.stft)はデフォルトの窓関数がハニング窓になっているようです。

まとめ

今回はScipyを使った窓関数作成方法を紹介しました。
この辺は特に気にせずフーリエ変換、周波数特性を確認していたので、役割含めて良い勉強になりました。

Python:ディジタルフィルタの設計① 移動平均フィルタ

Pythonから利用できる数学処理ライブラリ、Scipyを使ったディジタルフィルタの設計・評価方法をまとめました。
と言っても数学的な話やディジタルフィルタについての概要は飛ばし、フィルタ設計・評価に使えるサンプルプログラムを載せただけですが、何かに使えれば幸いです。

サンプルコード

本記事で載せているコードの完全版は、以下パス先で共有しています
github.com

開発環境について

$wmic os get caption
>Microsoft Windows 10 Pro

$wmic os get osarchitecture
>64ビット

$python -V
Python 3.7.6

$pip list -V
scipy 1.4.1


目次


サンプルプログラム

代表的なIIRフィルタである以下フィルタの設計・評価のためのコードをまとめており、フィルタの周波数特性や位相特性、群遅延プロット、信号のフィルタ処理出力ができます。
※本記事ではまず移動平均フィルタのみを紹介

続きの記事

移動平均以外のフィルタ特性及びコードを載せています
dango-study.hatenablog.jp

  1. 移動平均フィルタ
  2. ButterWorseフィルタ
  3. 第一種Chebyshevフィルタ
  4. 第二種Chebyshevフィルタ
  5. Besselフィルタ


紹介するデジタルフィルタ出力例

※サンプリング 0.005sec
グラフ1つ目がフィルタの周波数特性
グラフ2つ目がフィルタの位相特性

評価用のサンプル信号について

フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力するコードです。
サイン波にノイズを足したものをサンプル信号とします。

dt = 0.005 #1stepの時間[sec]
fs = 1/dt
times  =  np.arange(0,10,dt)
N = times.shape[0]

f  = 5  #サイン波の周波数[Hz]
sigma  = 0.5 #ノイズの分散

np.random.seed(1)
# サイン波
x_s =np.sin(2 * np.pi * times * f) 
x = x_s  +  sigma * np.random.randn(N)

x_np = np.array(x)

# サイン波+ノイズ
y_s =  np.zeros(times.shape[0])
y_s[:times.shape[0]//2] = 1
y = y_s  +  sigma * np.random.randn(N)

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

def FFT(Raw_Data, sampling_time):
    sampling_time_f = 1/sampling_time
    fft_nseg = 256
    fft_overlap = fft_nseg // 2
    f_raw, t_raw, Sxx_raw = signal.stft(Raw_Data, sampling_time_f, nperseg=fft_nseg, noverlap=fft_overlap)

    return f_raw, t_raw, Sxx_raw

def get_start_end_index(t, start_t, end_t):
    start_ind=0
    while(t[start_ind] < start_t):
        start_ind += 1
    end_ind = start_ind + 1
    while (t[end_ind] < end_t):
        end_ind += 1
    return start_ind, end_ind

def ave_spectrum(Sxx, t, start_t, end_t):
    start_ind, end_ind = get_start_end_index(t, start_t, end_t)
    ave_spectrum = np.zeros(Sxx.shape[0])
    for i in range(Sxx.shape[0]):
        ave_spectrum[i] = np.average(Sxx[i, start_ind:end_ind])
    return ave_spectrum

f_raw, t_raw, Sxx_raw = FFT(x_np, dt)

plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("normal"))
plt.legend()

# プロット表示(設定の反映)
plt.show()


実行結果は以下の通りです。

グラフ1つ目がサンプル信号の時系列です
グラフ2つ目がサンプル信号の周波数特性を示した時系列です。このグラフは関数「FFT」の出力結果で、横軸:時間 縦軸:周波数 色が出力(db)を示しており、周波数特性を時系列的に見たい場合に有効です。この画像だと5Hz付近で赤色となっており、グラフ3つ目の周波数特性とも合っていることが分かります。
グラフ3つ目がサンプル信号の周波数特性です。「start」「end」に時間を入力し、その時間範囲のデータでの周波数特性を示します。関数「FFT」の結果を関数「get_start_end_index」「ave_spectrum」で切り分け、平均化処理させた結果を描画しています。

移動平均フィルタについて

移動平均フィルタは、ローパスフィルタとして活用されます。
以下がサンプルコードとなり、フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力する関数です。
「move_num」移動平均個数であり、この値を変えていただくと、任意のフィルタについて評価できます。
「dt」サンプリングタイムであり、今回はサンプル信号に合わせています。
関数「character_MA」がフィルタ特性の出力用関数
関数「MA_filter」が実際のフィルタ処理用関数になります。

move_num = 5 #移動平均個数設定
dt = 0.005 #SamplingTime
fs = 1/dt  #周波数

class MA_Filter():
    def __init__(self, master=None):
        print("MA_Filter")

    def character_MA(self, N, fs, a=1):
        b = np.ones(N) / N
        fn = fs / 2# ナイキスト周波数計算

        w, h = signal.freqz(b, a)# 周波数応答計算
        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換

        p = np.angle(h) * 180 / np.pi#位相特性
        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def MA_filter(self, x, move_num):
        move_ave = []
        ring_buff = np.zeros(move_num)
        for i in range(len(x)):
            num = i % move_num
            if i < move_num:
                ring_buff[num] = x[i]
                move_ave.append(0)
            else:
                ring_buff[num] = x[i]
                ave = np.mean(ring_buff)
                move_ave.append(ave)
        return move_ave

MFilter = MA_Filter()

フィルタ特性について

フィルタ特性確認用のサンプルコードです。

x_freq_rspns_ave, y_freq_rspns_ave, p_ave, x_gd_ave, y_gd_ave = MFilter.character_MA(move_num, fs, a=1)

# グラフ表示
plt.figure(figsize=(12,6))
# 周波数特性
plt.subplot(3, 1, 1)
plt.plot(x_freq_rspns_ave, y_freq_rspns_ave)
plt.xlim(0, 100)
plt.ylim(-200, 2)
plt.ylabel('Amplitude [dB]')
plt.grid()

# 位相特性
plt.subplot(3, 1, 2)
plt.plot(x_freq_rspns_ave, p_ave)
plt.xlim(0, 100)
plt.ylim(-200, 200)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Phase [deg]')
plt.grid()

# 群遅延プロット
plt.subplot(3, 1, 3)
plt.plot(x_gd_ave, y_gd_ave)
plt.xlim(0, 100)
plt.ylim(-5, 15)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Group delay [samples]')
plt.grid()
plt.show()


実行結果は以下の通りです。

グラフ1つ目がフィルタの周波数特性
グラフ2つ目がフィルタの位相特性
グラフ3つ目がフィルタの群遅延を示しています。
群遅延は、フィルタ処理によって何サンプル分信号が遅れるのかを示しており、その確認ができます。

フィルタ処理結果の確認

フィルタ処理結果確認用のサンプルコードです。
「x」に任意の信号を入れていただくことで処理ができますので、処理したい信号を「x」に入れて試してみてください。

moveave_x = MFilter.MA_filter(x, move_num)

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))
plt.plot(times, x_s, c="blue", label=("sin"))
plt.plot(times, moveave_x, c="green", label=("MA"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

f_raw, t_raw, Sxx_raw = FFT(x_np, dt)
plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
f_raw, t_raw, Sxx_raw = FFT(x, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="red", label=("sin+noise"))
f_raw, t_raw, Sxx_raw = FFT(x_s, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("sin"))
f_raw, t_raw, Sxx_raw = FFT(moveave_x, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="green", label=("MA"))
plt.legend()

# プロット表示(設定の反映)
plt.show()


実行結果は以下の通りです。

グラフ1つ目の緑プロットが移動平均フィルタ後の出力となります。※フィルタ処理前が赤プロット
グラフ2つ目がフィルタ後出力の周波数特性を示した時系列です。
グラフ3つ目の緑プロットが移動平均フィルタ後の周波数特性になります。
今回作成した移動平均フィルタのフィルタ特性を見ると、20Hz付近から減衰するようなフィルタ特性を持っており、グラフ3つ目の緑プロットと赤プロットを比較して、設計通りのフィルタが作成できていることが分かります。

まとめ

今回はScipyを使ったディジタルフィルタの設計・評価で使えるコードを紹介しました。
フィルタ特性を確認と評価が合わせてできるように作成したので、使えるケースもあるかもしれません。
IIRの別のフィルタについても書いていく予定です。

Rasberry PiとSORACOMサービスとで異常検知システムを作成 2/2 (機能の実装編)

f:id:toku_dango:20210515193032p:plain
続いては、「機能の実装」 についてまとめていきます。

目次


実施したいこと(再掲)

作業者の異常を検知して、管理者に通知するシステムの作成が目標となるため、以下機能の実現を目指しました。

機能①:作業者(人)を認識して、異常を検知する
機能②:異常を検知したら、その旨をスマートスピーカーで管理者に通知する
機能③:異常を検知したら、状況の画像と位置情報をLINEで管理者に通知する
機能④:異常を検知したら、状況の画像と位置情報をSORACOM Lagoonダッシュボードで表示
おまけ機能:新型コロナ対策 密集検知機能(規定人数以上の人が会議室内にいる場合にその旨を管理者に通知する)

人の認識機能を有した市販の監視カメラは最近出てきましたが、まだまだ高価ですね。また、その中でも異常を検知する機能を有したものもそこまで多くないと思います。

今回SORACOMサービスを使用した理由としては、Wi-Fiが届かない様々な場所にも設置することを想定しているからです。
SORACOMではRaspberry Pi等に接続し、セルラー通信が可能となるUSBドングルがあるので、今回はそれを使用しています。

下記に本システムの概要図を示します。

f:id:toku_dango:20210508152246p:plain
システム概要図

機能の実装に向けて

  1. システム概要とセットアップ
  2. 機能の実装 (本ページ)


1. 異常検知の実装

異常検知は、人物検知と動体検知を組み合わせて実施します。
人物検知は、前回説明した物体検知の機能を使います。
動体検知は、動いている物体を検知することで、時刻違いの画像(前画像と現画像)の差分を求めることで検知します。
今回は、作業員が単独作業中に動けなくなることを異常状態と定義し、人物検知による作業員数を把握/動体検知によって、異常状態を判断します。
人間の姿勢推定して倒れているかどうかを判断する等、他にも良い方法もありそうなのですが、今回はプロトタイプとしてこの方法とします。

f:id:toku_dango:20210516162037p:plain

機能追加のためコード修正は、「Object Detection Tools」の「object_detection.py」をベースとして修正していきます。
コードが長いため、完全なソースコードここ「employee_anomaly_detection.py」に示します。「object_detection.py」から追加した機能についての説明を以下にまとめています。
抜粋した箇所(機能として働く関数箇所)の説明になるため、コードを試す場合は上記コードをダウンロードして実行することをお勧めします。

コードをダウンロードして実行をする場合は以下の通り実行ください。

$ cd ~\EmployeeAnomalyDetection\ObjectDetection\scripts
$ python3 employee_anomaly_detection.py#実行用コード

実行後に以下の通りウィンドウが現れ、異常検知(Emergency_Judge)と密集検知(Capacity_Judge)のカウンターが表示されます。
Tensor flowに関連した警告が出るかもしれませんが、実行上は問題がないためとりあえずは無視しています
f:id:toku_dango:20210516153819p:plain

1.1 人物検知について

人物検知は「Object Detection Tools」物体検知の機能を使わせていただくので、
機能自体に修正はないのですが、「employee_anomaly_detection.py」では人数をカウントする機能を追加します。
object_detection.py内の182行目~187行で検出した物体名を出力しており、
検出した物の名前をlistに格納させます。
181行と188行、216行辺りに以下の通りコードを追加。

      # convert bgr to rgb
      image_np = img_bgr[:,:,::-1]
      image_np_expanded = np.expand_dims(image_np, axis=0)
      start = time.time()
      output_dict = run_inference_for_single_image(image_np_expanded, detection_graph)
      elapsed_time = time.time() - start
        
      befDetectList = [] #<<<<<追加する 検出した物体名を格納するリストの定義
      personNum = 0 #<<<<<追加する 検出人数を0で置く

      for i in range(output_dict['num_detections']):
        class_id = output_dict['detection_classes'][i]
        if class_id < len(labels):
          label = labels[class_id]
        else:
          label = 'unknown'
          
        befDetectList.append(label)#<<<<<追加する 検出した物体名をリストに格納

        detection_score = output_dict['detection_scores'][i]

        if detection_score > 0.5:
            # Define bounding box
            h, w, c = img.shape
            box = output_dict['detection_boxes'][i] * np.array( \
              [h, w,  h, w])
            box = box.astype(np.int)

            speed_info = '%s: %.3f' % ('fps', 1.0/elapsed_time)
            cv2.putText(img, speed_info, (10,50), \
              cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)

            if mode == 'bbox':
              class_id = class_id % len(colors)
              color = colors[class_id]

              # Draw bounding box
              cv2.rectangle(img, \
                (box[1], box[0]), (box[3], box[2]), color, 3)

              # Put label near bounding box
              information = '%s: %.1f%%' % (label, output_dict['detection_scores'][i] * 100.0)
              cv2.putText(img, information, (box[1] + 15, box[2] - 15), \
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 1, cv2.LINE_AA)
            elif mode == 'mosaic':
              img = mosaic_area(img, box[1], box[0], box[3], box[2], ratio=0.05)

            if label ==  "person": #<<<<<追加する
              personNum += 1  #<<<<<追加する 検出した人物数を数える

1.2 動体検知について

OpencCVモジュールの関数を使用しています。
動体検知までの基本的な流れは、以下の通りです。

  1. 画像をグレースケールに変換
  2. 時刻違いの画像(前画像と現画像)の差分を計算
  3. 上記差分値としきい値とを比較して動体を認識(検知)

処理の詳細は、以下で詳しく解説されています。
developers.cyberagent.co.jp

「employee_anomaly_detection.py」の動体検知に関するコードは以下の通りです。

moveThreshold=1000 #<<<<<<動体検知 しきい値
avg=None

def moveDetect(img, avg):
    moveFlg = 0
    #画像をグレースケールに変換
    grayImg=cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    #前回画像がない場合の処理
    if avg is None:
        avg = grayImg.copy().astype("float")
        return avg, moveFlg

    #前画像との差分を取得する
    cv2.accumulateWeighted(grayImg, avg, 0.00001)
    delta = cv2.absdiff(grayImg, cv2.convertScaleAbs(avg))
    thresh = cv2.threshold(delta, 50, 255, cv2.THRESH_BINARY)[1]
    contours, h = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)  

    #画像内 差分箇所のうち最大箇所を抽出
    max_area=0
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if max_area < area:
            max_area = area

    #動体判定
    if max_area > moveThreshold:
        moveFlg = 1

    #今回取得画像を保存
    avg = grayImg.copy().astype("float")

    return avg, moveFlg

変数moveThreshold が動体検知のしきい値であり、動体検知がうまくいかない場合は
必要に応じて変更ください。

1.3 異常検知について

異常検知までの流れは、以下の通りです。

  1. 人物検知にて画面に映る人数が1名の場合(1.1 人物検知の変数 personNum==1)、かつ動体検知機能で動体が映らない時(1.2 動体検知の変数 moveFlg = 0)に異常検知カウンターを上げる。
  2. 異常検知カウンターがしきい値を超える、かつ最初にカウンターを上げた時から規定時間以上 時間が経っている場合に異常検知フラグを立てる


「employee_anomaly_detection.py」異常検知に関するコードは以下の通りです。
今回は、異常検知カウンターが60以上でかつ3分以上時間が経っている場合に異常検知フラグが立つように設定しています。

import datetime #<<<<<<モジュールのimportが必要です

#emergencyJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

MovBefTimes=[0,0,0,0,0,0]
MovJudgeCounter = 0 #<<<<<<異常検知カウンター
MovResetTime=30 #<<<<<<異常検知カウンター リセット時間
MovAlertThreshold = 60 #<<<<<<異常検知 カウンター しきい値
MovAlertWaitTime = 180 #<<<<<<通知待機時間設定
#画像の格納パス
pictpath='/home/pi/Desktop/CheckView/ObjectDetection/picts' #<<<<<<画像を保存するパス

#現在日付を取得
nowstr=datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
nowTime=time.time()

def emergencyJudge(img, nowTime, MovJudgeCounter):

    if MovJudgeCounter == 0:
        MovJudgeCounter = 1
        #初回検知時間を保存
        MovBefTimes[0]=int(nowTime)
    else:
        if int(nowTime) - MovBefTimes[1] < MovResetTime:
             MovJudgeCounter += 1
        else:
            MovJudgeCounter = 0
    print("MoveCounter=", MovJudgeCounter)
    #検知時間を保存
    MovBefTimes[1]=int(nowTime)
        
    #しきい値及び経過時間が規定以上になった場合に通知
    if MovJudgeCounter >= MovAlertThreshold and int(nowTime) - MovBefTimes[0] > MovAlertWaitTime:
        if int(nowTime) - MovBefTimes[2] > interval:
            #通知時間を保存
            MovBefTimes[1]=int(nowTime)
            #ファイルに保存 
            filename=pictpath+'/'+nowstr+'_EmergencyCall.png' #<<<<<<画像を保存する
            cv2.imwrite(filename, img)
            """通知機能を使う場合はコメントアウトを外す
            messageLine = "従業員異常通知_会議室" + BUILDING_NAME + ROOM_NUM
            messageAlexa = BUILDING_NAME + ROOM_NUM + "従業員の異常を確認しました。状況を確認してください。"
            lineMessage(filename, messageLine)
            alexaMessage(messageAlexa)
            sendSoracom(filename)
            """
                
    return MovJudgeCounter


1.4 おまけ機能について

おまけ機能は、新型コロナ対策 密集検知機能です。
会議室にカメラを設置し、規定人数以上の人が会議室内にいる場合にその旨を管理者に通知します。

密集検知までの流れは、以下の通りです。

  1. 人物検知にて会議室内の人数をカウント(1.1 人物検知の変数 personNumに対応)
  2. 上記を会議室内の人数として、規定人数以上場合に密集検知フラグを立てる


「employee_anomaly_detection.py」の密集検知用の関数は以下の通りです。
今回は規定人数を3人としています (CAPACITY_NUM = 3)

import datetime #<<<<<<モジュールのimportが必要です
#capacityJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

CaBefTimes=[0,0,0,0,0,0]
CAPACITY_NUM = 3 #<<<<<<密集検知 しきい値 (3人以上で密集検知カウンターを上げる)
CaJudgeCounter = 0 #<<<<<<密集検知 カウンター
CaResetTime=20 #<<<<<<密集検知カウンター リセット時間
CaAlertThreshold = 60 #<<<<<<密集検知 カウンター しきい値

def capacityJudge(personNum, img, nowTime, CaJudgeCounter):
    if personNum >= (CAPACITY_NUM / 2):
        
        if CaJudgeCounter == 0:
            CaJudgeCounter = 1
        else:
            if int(nowTime) - CaBefTimes[0] < CaResetTime:
                CaJudgeCounter += 1
            else:
                CaJudgeCounter = 0
        #検知時間を保存
        CaBefTimes[0]=int(nowTime)
        print("CapacityCounter=", CaJudgeCounter)
            
        #通知(一定時間間隔)
        if CaJudgeCounter >= CaAlertThreshold:
            if int(nowTime) - CaBefTimes[1] > interval:
                #通知時間を保存
                CaBefTimes[1]=int(nowTime)
                #ファイルに保存 
                filename=pictpath+'/'+nowstr+'MaxCapacity.png'#<<<<<<画像を保存する
                cv2.imwrite(filename, img)
                """通知機能を使う場合はコメントアウトを外す
                messageLine = "密集防止アラート_会議室" + BUILDING_NAME + ROOM_NUM
                messageAlexa = BUILDING_NAME + ROOM_NUM + "会議室の収容人数を超過しています。注意喚起を実施ください。"
                lineMessage(filename, messageLine)
                alexaMessage(messageAlexa)
                sendSoracom(filename)
                """

    return CaJudgeCounter


2. 通知機能の実装

通知機能は、以下3種類で構成されており、異常検知フラグ及び密集検知フラグが立った場合に動かします。

  • Amazon Echoで音声通知
  • LINEで状況の画像と位置情報を通知
  • SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

2.1 Amazon Echoで音声通知

Amazon Echoを喋らせる方法は、以前の記事にまとめており、「alexa_remote_control」の機能を使います。スマートスピーカーAPIを使えば、非常に簡単に音声通知ができますね。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。
※以下に示すコードの他、同じパスに置く「alexa-remote_control.sh」ファイル内の設定も必要です。

import subprocess #<<<<<<モジュールのimportが必要です
import requests #<<<<<<モジュールのimportが必要です

def alexaMessage(message):
    message = "speak:" + message
    cmd = ["./alexa_remote_control.sh", "-e", message] #messageに通知したい文字を入れる
    res = subprocess.call(cmd)


2.2 LINEで状況の画像と位置情報を通知

LINEで通知する方法も、以前の記事にまとめており、こちらは「LINE Notify」の機能を使います。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def lineMessage(fname, message):
    url = "https://notify-api.line.me/api/notify"
    token = [""] #ここにLINE Notifyのトークンを入力
    for i in token:
        headers = {"Authorization" : "Bearer "+ i}
        payload = {"message" :  message} 
        files = {"imageFile": open(fname, "rb")}
        r = requests.post(url, headers = headers, params=payload, files=files)
        print(r.text)


通知結果
異常検知後に画像とメッセージがLINEに送られます。
※密集検知後も同様に画像とメッセージがLINEに送られます。
f:id:toku_dango:20210515214900p:plain

2.3 SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

SORACOM Lagoonダッシュボードで、状況が確認できるように設定します。
具体的には、以下のように設定をしました。
f:id:toku_dango:20210515224106p:plain
異常検知、密集検知後に画像と位置情報がSORACOM側に送られ、ダッシュボードで表示されます。
※位置情報は、サンプルとして大阪城に設定しています。

2.3.1 ダッシュボードの設定

ダッシュボードは、SORACOM LagoonのUIを使って設定します。
簡単に設定できますので、公式ページ(以下URL)を参考にして下さい。
users.soracom.io

画像ファイルを SORACOM Lagoonで表示させる方法も別ページで詳しく説明されています。
Getting Started: Harvest Files の画像を SORACOM Lagoon で表示する | SORACOM Harvest | SORACOM Users

2.3.2 SORACOM Harbest Data/Harvest Filesにファイルやデータを送る

SORACOM Lagoon ダッシュボードでファイルやデータを表示させます。
今回は、以下ファイルとデータを送ります。

ファイル: 画像ファイル
データ: 緯度経度情報, 部屋番号


「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def sendSoracom(filename):
    send_imageFile ='curl -v -X PUT --data-binary @'  + filename + ' -H content-type:images/png http://harvest-files.soracom.io/lagoon/harvest.png' #<<<<<<画像ファイルを送る
    send_message ='curl -v -X POST -H content-type:application/json -d {\"latitude\":34.6872318,\"longitude\":135.5259173,\"room_num\":' + ROOM_NUM + '} http://harvest.soracom.io' #<<<<<<緯度経度/部屋番号情報を送る(緯度経度は、サンプルとして大阪城の位置情報となっています)
    res = subprocess.call(send_message.split())
    res = subprocess.call(send_imageFile.split())


おわりに

今回は、人物検知と動体検知機能と組み合わせることで、
異常・密集判定し、3種類の方法で通知ができるようにしました。
異常検知に際して、カメラを作業現場に固定しておくことができるため
「装置を装着する等の作業員の追加作業が必要ない」という目標は、達成できたのではないかと思います。
ただ、「異常を正しく検知する」に関しては、まだまだ改善の余地がありそうなので、今後の課題と考えています。
機能の実装については、部分的な説明で大変申し訳ありませんが、試しに使ってみたい方は「employee_anomaly_detection.py」を使って試してください。

物体検知機能を使用すると、他にも色々なことができそうなので、今後も応用方法を考えて実行したいと思います。

Rasberry PiとSORACOMサービスとで異常検知システムを作成 1/2 (システム概要とセットアップ編)

f:id:toku_dango:20210515193032p:plain
会社での作業員異常通報に関する改善が図れそうだったので、試作をしてみました。
ここでは、Rasberry PiとSORACOMサービスとを組み合わせた作業員の異常状態検知 監視システムについてご紹介します。


目次


背景

私が所属している会社では、作業員が危険な状態に陥った場合を検知/通報する「作業員異常通報装置」が運用されています。
危険な状態とは、体調不良や事故で他の人に助けを求める必要がある状態のことです。
単独作業の場合は、この装置を身に付けるルールとなっていますが、「きちんと使われない」「誤作動が多い」など運用に難ありな状況です。

作業員異常通報装置の運用

該当の装置についてちょっとだけ紹介します。※装置画像はあえて載せません
この装置は、ズボンのベルトに装着し、手動のスイッチと人の転倒を検知するセンサのいずれかが作動することで所定の場所に無線連絡する装置です。
単独作業時に装着するルールになっていますが、急に一人になる場合/装着し忘れる場合があります。また、装置の装着位置や作業内容によって、「転倒を検知するセンサが誤報する」という問題があり、誤報時には管理者が現場確認に行く手間が発生します。このような状況では、誤報を恐れて積極的に装着したくないという気持ちも出てくるのではと思っています。

上記問題点を踏まえて、「装置を装着する等の作業員の追加作業が必要ない」「異常を正しく検知」方法を検討しました。また、誤報に対する追加の対策として、管理者が遠隔で即座に状況確認できる方法も検討しました。

実施したいこと

作業者の異常を検知して、管理者に通知するシステムの作成が目標となるため、以下機能の実現を目指しました。

機能①:作業者(人)を認識して、異常を検知する
機能②:異常を検知したら、その旨をスマートスピーカーで管理者に通知する
機能③:異常を検知したら、状況の画像と位置情報をLINEで管理者に通知する
機能④:異常を検知したら、状況の画像と位置情報をSORACOM Lagoonダッシュボードで表示
おまけ機能:新型コロナ対策 密集検知機能(規定人数以上の人が会議室内にいる場合にその旨を管理者に通知する)

人の認識機能を有した市販の監視カメラは最近出てきましたが、まだまだ高価ですね。また、その中でも異常を検知する機能を有したものもそこまで多くないと思います。

今回SORACOMサービスを使用した理由としては、Wi-Fiが届かない様々な場所にも設置することを想定しているからです。
SORACOMではRaspberry Pi等に接続し、セルラー通信が可能となるUSBドングルがあるので、今回はそれを使用しています。

下記に本システムの概要図を示します。

f:id:toku_dango:20210508152246p:plain
システム概要図


使ったもの

  • Rasberry Pi 4 Model B+

www.amazon.co.jp

 公式のカメラモジュールなります。USBカメラでも代用がきくと思います。
www.amazon.co.jp

 SORACOMのSIMを入れて使うUSBモデムです。
 USBに接続状態を示すインジケーター付いているので、
 状態が分かりやすいのが利点です。

 音声で通知するために、今回はスマートスピーカーを使用します。
 Amazon Echoを喋らせる方法は色々とあるので、使い勝手が良いです。
 Amazon Echoを喋らせる方法は、以下にまとめています。
dango-study.hatenablog.jp

機能実装に向けて

機能①~④およびおまけ機能は、以下機能を組み合わせて実現します。
実装までの説明が少し長くなりそうなので、ここではセットアップまでを記載します。

  1. システム概要とセットアップ (本ページ)
  2. 機能の実装 (次ページ)


f:id:toku_dango:20210516162100p:plain

環境の構築

1. ハードウェアの構成について

Rasberry Piと周辺機器は以下の通り接続します。
f:id:toku_dango:20210516195405p:plain

本記事で記載の対象となるRasberryPiは、OS(Rasbian)が既にインストールされていることを前提としています。
※開発環境は以下の通り

$python3 -V
Python 3.7.3


2. Camera Moduleを使うための設定

RaspberryPiでカメラを動かす必要があるので、その設定を実施します。
Camera Moduleのセットアップ方法は、多く掲載されているので、ここでは割愛します。
詳細は以下リンクを参照してください。
thepihut.com

動作確認用に、画像をキャプチャするコードを紹介します。
コードを実行して、カメラ映像が写った5秒後の映像をキャプチャして表示するコードとなっています。

from picamera import PiCamera
import os
from time import sleep

camera = PiCamera()
camera.resolution = (300, 200)
camera.framerate = 15
camera.start_preview(alpha=100)
sleep(5)

camera.capture("image1.jpg")
camera.stop_preview()
os.system("xdg-open image1.jpg")


3. SORACOMを使うための設定

SORACOMを使い始めるまでのフロー、SORACOMのUSBモデムをRasberry Piで使用する方法については、公式サイトで説明されているので、そのリンクを紹介します。
SORACOM の利用を始める | ガイド | SORACOM Users
Getting Started: Raspberry Pi と USB モデム | 各種デバイスで SORACOM Air を使用する | SORACOM Users

4. 必要なモジュールのセットアップ

人物検知と異常検知には、ディープラーニングを使った画像認識ライブラリであるTensorFlowとOpenCVを使います。
自身でパッケージをインストールすることも可能ですが、より簡単にインストールができるスクリプトをからあげさんが公開して下さっている(以下URL)ので、それを活用させていただきます。
以下のように、コマンドでスクリプトを「setup-ai.sh」をダウンロードして実行しましょう。
karaage.hatenadiary.jp


$ git clone https://github.com/karaage0703/raspberry-pi-setup #スクリプトファイルのダウンロード
$ cd raspberry-pi-setup && ./setup-ai.sh #スクリプトファイルのパスへ移動/スクリプト実行


5. 物体検知ツールのセットアップ

上記までで開発環境は整いました。
ここからはコードを書いていくことになるのですが、まずは物体検知ができるようするために、同じくからあげさんが公開して下さっている「Object Detection Tools」を活用させていただきます。
人物含めた物体の検知を手軽に実施するには、Googleさんが提供している「Object Detection API」を使う例もあるのですが、
Rasberry Pi で使用するにはデータ容量や実装までのハードルが高かったため、活用させていただきます。
物体検出に必要な学習モデルも学習済みのモデルを用いています。

以下のように、コマンドでスクリプトを「get_ssdlite_mobilenet_v2_coco_model.sh 」をダウンロードして実行しましょう。

$ cd && git clone https://github.com/karaage0703/object_detection_tools #スクリプトファイルのダウンロード
$ cd ~/object_detection_tools/models #スクリプトファイルのパスへ移動
$ ./get_ssdlite_mobilenet_v2_coco_model.sh #スクリプト実行


試しに物体検知を実施する場合は、以下の通りコードを実行ください。
カメラの映像とともに物体検知(認識したものを四角で囲み左下に名前を記載)されていれば成功です。

$ cd ~/object_detection_tools #pythonファイルパス ディレクトリへ移動
$ python3 scripts/object_detection.py -l='models/coco-labels-paper.txt' -m='models/ssdlite_mobilenet_v2_coco_2018_05_09/frozen_inference_graph.pb' -d='raspi_cam' #コード実行


実行結果
f:id:toku_dango:20210516121933p:plain
物体の半分しか映っていなくてもある程度の精度で認識してくれるようです。
簡単にこんなことが試せるのは素晴らしいですね、からあげさんに感謝です。

まとめ

本記事では、システム概要とセットアップについて説明をしました。
物体検知だけでもかなり遊べると思いますが、次ページはこの機能を使って目的の機能を組み込んでいきます。

次の記事

  1. システム概要とセットアップ
  2. 機能の実装 (次ページ)

Python:LINEに通知をする

PythonでLINEにメッセージや画像を送りたい時に、毎回検索しているのでここにまとめておきます。
プログラムの完了通知をLINEに送りたい/センサーの値を監視していて、特定の値になったらその通知をLINEに送りたいなどで使えるかと思います。

1.LINE Notifyのトークン取得

パーソナルアクセストークンを利用することで、Webサービスの登録をせずに通知を設定することができるようです。
詳細は以下公式ページを参考にしてください。
LINE Notify

1.1 LINEのマイページにアクセスする

上記公式ページにアクセスして、自身のLINEアカウントにログイン後、マイページをクリックする。

1.2 LINE Notifyのトークン発行

以下順番でページをクリックして、 発行されたトークンをコピーする。
トークンを発行する>通知を送信するグループを選択>発行する」

2.LINE通知機能を実装

2.1 必要なモジュールをインストールする

 以下コードを実行。

$pip install requests 

2.2 LINEに通知

以下コードを実行する。
LINEにメッセージを送れます。

import requests

def main():
    message =  ""#ここにメッセージを入力
    send_line_notify('message')

def send_line_notify(message):
    """
    LINEに通知する
    """
    url = "https://notify-api.line.me/api/notify"
    token = "" #ここにトークンを入力
    headers = {"Authorization" : "Bearer "+ token}

    headers = {'Authorization': f'Bearer {token}'}
    payload = {"message" :  message}
    requests.post(url, headers = headers, params=payload, files=files)

if __name__ == "__main__":
    main()


+αサンプルコード

Rasberry Piにて、Rasberry Pi Cameraで写真を撮ってLINEに画像とメッセージを送るPythonコードです。

from picamera import PiCamera
from time import sleep
import subprocess, os, sys, re
from datetime import datetime
import time
import requests
import os
import subprocess
from base_camera import BaseCamera

def camera():
    now = datetime.now()
    dir_path = '/home/pi/smart/image/'
    file_name= now.strftime('%Y%m%d%_H') + '.jpg'
    fname    = dir_path + file_name
    try:
        os.mkdir(dir_path)
    except OSError:
        print('Date dir already exists')
    #os.system('raspistill -w 480 -h 360 -o ' + fname)
    camera = PiCamera()
    camera.resolution = (600, 400)
    camera.framerate = 15
    camera.start_preview(alpha=200)
    sleep(4)
    camera.capture(fname)
    camera.stop_preview()
    return fname

def line(fname):
    url = "https://notify-api.line.me/api/notify"
    token = "" #ここにトークンを入力
    headers = {"Authorization" : "Bearer "+ token}

    message =  "写真を撮りました!"
    payload = {"message" :  message}
    files = {"imageFile": open(fname, "rb")}
    r = requests.post(url, headers = headers, params=payload, files=files)
    print(r.text)
    

def main(image=""):
    if image == "":
        call_alexa()
        sleep(5)
        fname = camera()
    else:
        fname = image
    if fname:
      line(fname)

if __name__ == '__main__':
    main()


通知画像とメッセージ
f:id:toku_dango:20210515140217p:plain

まとめ

今回はPythonでLINEに通知する方法をまとめました。
LINEは、普段から使うタイミングが多いと思うので、通知するには良いですね。

RasberryPi:Pythonコードを定期実行する

特定のスクリプトを定期実行したいケースがあったため、指定の時間でコードを実行する方法を紹介します。
今回はcronという機能を使っていきます。

開発環境について

【ハードウェア】
RasberryPi

$python -V
Python 2.7.16


目次


cronが有効になっているかどうかを確認

pi@rasberrypi: ~ $chekconfig cron
>>cron on


上記の通り「cron on」と表示されていれば有効になっているので、問題ありません。
「cron off」と表示されている場合は以下実行して、有効化してください。

pi@rasberrypi: ~ $sudo systemctl enable cron


cronの設定

crontabを起動し、ルールに従って内容を修正して設定します。

pi@rasberrypi: ~ $crontab -e

crontabの書き方

以下ルールに従って、それぞれの項目をスペースで区切って設定します。
crontabを開くと、初期状態では全てコメントアウトされている状態になっているので、その一番下に記載するとよいです。
「分[0-59] 時[0-23] 日[1-31] 月[1-12] 曜日[0-7] 実行内容」

※細かい設定
範囲指定:ハイフンで設定「1-12, 2-4」
複数指定:カンマで区切って設定「1-12, 2-4」
実行間隔:このように記載「*/5」#5分間隔


※書き方例

0 9 1 1 * python /home/pi/Desktop/camera.py #1月1日9時0分にcamera.pyを実行する
0 9 * * 1-5 python /home/pi/Desktop/camera.py #月曜日から金曜日まで、9時0分にcamera.pyを実行する
*/15 * * * * python  /home/pi/Desktop/camera.py #5分ごとにcamera.pyを実行する
@reboot python  /home/pi/Desktop/camera.py #起動時に毎回、camera.pyを実行する

設定後は書き込み(保存)をしましょう。

まとめ

今回はPythonコードを定期実行する方法についてまとめました。
是非活用してください

Amazon Echo(Alexa)を喋らせる

f:id:toku_dango:20210425134042p:plain

Amazon Echoは、話をするようにAmazon Echoに話しかける or Amazon アプリから話すように指示をする等でないと喋ってくれないので、もう少し色々と喋らせる方法はないかと探していました。

本記事ではalexa_remote_controlを使用しますが、その他の方法としてはNode-redを使った方法もあるようです。
他の記事でも色々とまとめられているので、実用性を意識してまとめます。

Alexa-remote-controlのダウンロード

Alexa-remote-controlの「alexa-remote_control.sh」を適当なパスに保存する。

pi@rasberrypi: ~ $wget https://raw.githubuser.com/thorsten-gehrig/alexa-remote-control/master/alexa_remote_control.sh

github.com

Alexa-remote-controlを実行するために必要なライブラリをインストール

JSONツールが必要になるので、インストールする。

pi@rasberrypi: ~ $sudo apt-get install jq


Alexa-remote-controlの設定

alexa-remote-control.sh内に、以下項目の記載があるので、自身のAmazonアカウントに関する情報と必要な設定項目を入力する。

SET_EMAIL='' #Amazonアカウントにログインするためのemailアドレス
SET_PASSWORD='' #Amazonアカウントにログインするためのパスワード
SET_LANGUAGE='ja-JP'
SET_TTS_LOCALE='ja-JP'
SET_AMAZON=''amazon.co.jp 
SET_ALEXA='alexa.amazon.co.jp'


Alexa-remote-controlを使用する

パターン① コマンドラインで実行

pi@rasberrypi: ~ $ ./alexa_remote_control.sh -e "speak:おはよう"


パターン② pythonコード内で実行

import subprocess

cmd = ["./alexa_remote_control.sh", "-e", "speak:おはよう"]
res = subprocess.call(cmd)

Amazon Echoバイスが喋りだしたら成功ですね。"speak:"の後のワードを好きに変えていただければ、そのワードを喋ってくれます。
どちらの方法でも、実行の結果は同じになると思います。

その他コマンドについて

基本公式やalexa-remote-control.sh内にコマンドの詳細記載があるので不要と思いますが、参考まで。


特定の事柄を喋らせる

import subprocess

cmd = ["./alexa_remote_control.sh", "-e", "speak:おはよう"] #自由に設定したワードを喋らせる
cmd = ["./alexa_remote_control.sh", "-e", "weather"] #天気予報について
cmd = ["./alexa_remote_control.sh", "-e", "traffic"] #交通状況について
cmd = ["./alexa_remote_control.sh", "-e", "flashbriefing"] #最新ニュースについて
cmd = ["./alexa_remote_control.sh", "-e", "tellstory"] #適当な話について

res = subprocess.call(cmd)



指定の呼びかけに対する反応

import subprocess

cmd = ["./alexa_remote_control.sh", "-e", "textcommand:おはよう"] 
res = subprocess.call(cmd)


ラジオ再生

import subprocess

cmd = ["./alexa_remote_control.sh", "-r", "s8638"] #第3引数にラジオIDを入力する

res = subprocess.call(cmd)


まとめ

今回はAmazon Echoを喋らせる方法についてまとめました。
色々と組み合わせて、スマートホーム化を図りたいですね。