まるっとワーク

データ分析・電子工作など気になることを残していきます

機械学習について(言葉の定義など、はじめの一歩)


機械学習について学習を始めたので、理解した内容についてまとめて説明していきたいと思います。

目次

機械学習とは?

コンピューターサイエンティストで有名なRobert Elias Schapire氏は以下の通り述べている。

studies how to automatically learn automatically learn to make accurate
predictions predictions based on past observations

(Introduction to Machine Learning)


"過去の観察に基づいた正確な予想を自動的に行うこと"といった感じで、人が行う学習行為をPC上で表現するというニュアンスが入っている。
また、機械学習では主体はデータであり、データ内の関係を把握して、その法則(ルール)やアルゴリズムを学習するといったことに使われる。
物理法則や人間が考えたルールから答えを導くといった、ルール主体の考え方ではないことがポイント

機械学習の手法

色々なサイトでも書かれているが、以下の通り3つに分類される。
使うアルゴリズムが違うというよりは、解くべき問題(使用目的)が異なるという観点での分類

  解析目標のラベルが存在し、目標値とその他データとの相関関係の学習から、主に予想で使用される

  データ間の関係などの構造把握に使用される

  ゲームのスコアなどの目標値を最大化することを目的に、その仕組みを学習する為に使用される
※それぞれの代表モデル等は勉強を進めた後まとめていく

精度の高いモデル構築に向けて

より多くの種類・量のデータがあるほど、良い回答を導出できる。
ただし、ある程度目標値と関係あるデータが必要であり、目標値の説明に必要なデータ(変数)作成作業は特徴量エンジニアリングと言われるなど、機械学習において一番難しい。

AutoML(Automated Machine Learning)とは?

機械学習の機能を複数呼び出して使うことができるフレームワークであり、アルゴリズム固有値(ハイパーパラメータ)までも最適化対象となっている。
データの整形やエラー値の修正といった機械学習の前処理が自動化されたものや、ディスプレイツールを含んだものも存在する。
プログラムが得意でない方でも使うことができるという利点やデータサイエンティストの業務時間の削減に効果が期待できる。
対象データにどういったモデルが適しているのか、データ間の関係性の把握のためにAutoMLを使用するケースも多い。

代表的なAutoML

もっとも知名度が高いGoogleが開発したAutoML
文字列などのデータを自動的に整形する機能も有している。
cloud.google.com

Microsoftが開発したAutoML
docs.microsoft.com

IBMが開発したAutoML
www.ibm.com

まとめ

今回は導入ということで機械学習の概要やエンジニア向けの関連情報をまとめました。具体的なモデルについても勉強してまとめていきたいと思います。

matplotlibとseabornの日本語の文字化けを修正する

Pythonから利用できる可視化ライブラリ、matplotlibやseabornにて可視化を行った際の日本語の文字化けを防ぐ方法をまとめていきます。

目次

開発環境について

$wmic os get caption
>Microsoft Windows 10 Pro

$wmic os get osarchitecture
>64ビット

$python -V
Python 3.7.6

$pip list -V
scipy 1.4.1

問題事例

日本語が含まれるグラフを作成する場合、以下の通り文字化けが発生する。

import numpy as np
import matplotlib.pylab as plt

x = np.arange(100)
y = np.sin(x*0.1)
plt.plot(x, y)
plt.xlabel("時間")
plt.ylabel("値")


どうやら、本モジュールでは日本語に対応していない欧文フォント(sans-serif)をしているため日本語表示ができないようです。
f:id:toku_dango:20210924091239p:plain

対応方法

対応方法を調べてみると、Jupyterの設定ファイルを書き換えるなどの方法があるようですが、少し大変そうなので、これとは異なるより簡単な方法についてまとめます。

japanize-matplotlibモジュールをインストール

 このモジュールはmatplotlibのフォント設定を自動で日本語化してくれます。
pypi.org

$ pip install japanize-matplotlib

対応結果

ラベルの文字化けがなくなり、正常に日本語表示されるようになりました。

import numpy as np
import japanize_matplotlib #モジュール追加
import matplotlib.pylab as plt

x = np.arange(100)
y = np.sin(x*0.1)
plt.plot(x, y)
plt.xlabel("時間")
plt.ylabel("値")

f:id:toku_dango:20210924091257p:plain

まとめ

今回はデータ可視化の際の文字化けに対する対応方法をまとめました。
報告用で日本語表記のグラフを作成したい場合に、こういった対応をふと忘れてしまうので、忘れずに使えるようにしたいです。

Pythonが使えるように環境を整える

最近PCを買い替えたので、新しいPCにてPythonが使えるように対応した内容を残しておきます。


目次

開発環境について

Windows10 Home 64bit
Python 3.8.8

Pythonインストール

Pythonのみをインストールする、Anacondaなどの環境構築プラットフォームごとインストールするなど、いろいろと選択肢がありますが、まずはPython利用環境を構築する。

Pythonがインストールされたフォルダパスを確認

デフォルトでは"C:\Users\"以下にインストールされている

環境変数を編集

表示した環境変数テーブルの中の"Path"をダブルクリックして編集する。
f:id:toku_dango:20210912214704p:plain

Python.exeが存在するフォルダパスやScriptsパスにパスを通す。
以下の通りパスを追加
f:id:toku_dango:20210912214900p:plain

上記設定の後、コマンドプロンプトを再起動して"python"と入力してpythonが動作するかを確認
f:id:toku_dango:20210912215040p:plain

コマンドプロンプトpythonを実行するとMicrosoft Storeが開く問題

上記の通り、pythonインストール後にコマンドプロンプトで以下実行した際にMicrosoft Storeが開いてしまうことへの対応

$ python

対処方法

設定→アプリと機能→アプリ実行エイリアスを開く

  
f:id:toku_dango:20210912215541p:plain

アプリインストーラpython.exe (python3.exe)をオフにする

  
f:id:toku_dango:20210912215521p:plain

再度コマンドプロンプトを再起動して"python"と入力してpythonが動作するかを確認!
問題なく動作すれば解決です!

まとめ

最後に紹介したMicrosoft Storeが開いてしまうという問題で、私は詰まっていました。
原因がわからず、少し手間取ったので今後は時間を無駄にしないように上記まとめさせていただきました。

Python:ディジタルフィルタの設計② Butterworth/Chebyshev/Bessel

Pythonから利用できる数学処理ライブラリ、Scipyを使ったディジタルフィルタの設計・評価方法の続きをまとめていきます。
前回同様、数学的な話やディジタルフィルタについての概要は飛ばし、フィルタ設計・評価に使えるサンプルプログラムを載せただけですが、何かに使えれば幸いです。

前の記事:  
dango-study.hatenablog.jp

本記事で紹介したコード:
github.com


サンプルプログラム

代表的なFIRフィルタである以下フィルタの設計・評価のためのコードをまとめており、フィルタの周波数特性や位相特性、群遅延プロット、信号のフィルタ処理出力ができます。

評価用のサンプル信号について

フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力するコードです。
サイン波にノイズを足したものをサンプル信号とします。

dt = 0.005 #1stepの時間[sec]
fs = 1/dt
times  =  np.arange(0,10,dt)
N = times.shape[0]

f  = 5  #サイン波の周波数[Hz]
sigma  = 0.5 #ノイズの分散

np.random.seed(1)
# サイン波
x_s =np.sin(2 * np.pi * times * f) 
x = x_s  +  sigma * np.random.randn(N)

x_np = np.array(x)

# サイン波+ノイズ
y_s =  np.zeros(times.shape[0])
y_s[:times.shape[0]//2] = 1
y = y_s  +  sigma * np.random.randn(N)

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

def FFT(Raw_Data, sampling_time):
    sampling_time_f = 1/sampling_time
    fft_nseg = 256
    fft_overlap = fft_nseg // 2
    f_raw, t_raw, Sxx_raw = signal.stft(Raw_Data, sampling_time_f, nperseg=fft_nseg, noverlap=fft_overlap)

    return f_raw, t_raw, Sxx_raw

def get_start_end_index(t, start_t, end_t):
    start_ind=0
    while(t[start_ind] < start_t):
        start_ind += 1
    end_ind = start_ind + 1
    while (t[end_ind] < end_t):
        end_ind += 1
    return start_ind, end_ind

def ave_spectrum(Sxx, t, start_t, end_t):
    start_ind, end_ind = get_start_end_index(t, start_t, end_t)
    ave_spectrum = np.zeros(Sxx.shape[0])
    for i in range(Sxx.shape[0]):
        ave_spectrum[i] = np.average(Sxx[i, start_ind:end_ind])
    return ave_spectrum

f_raw, t_raw, Sxx_raw = FFT(x_np, dt)

plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("normal"))
plt.legend()

# プロット表示(設定の反映)
plt.show()




前の記事と同じものですが、実行結果は以下の通りです。
f:id:toku_dango:20210612231052p:plain
グラフ1つ目がサンプル信号の時系列です
グラフ2つ目がサンプル信号の周波数特性を示した時系列です。このグラフは関数「FFT」の出力結果で、横軸:時間 縦軸:周波数 色が出力(db)を示しており、周波数特性を時系列的に見たい場合に有効です。この画像だと5Hz付近で赤色となっており、グラフ3つ目の周波数特性とも合っていることが分かります。
グラフ3つ目がサンプル信号の周波数特性です。「start」「end」に時間を入力し、その時間範囲のデータでの周波数特性を示します。関数「FFT」の結果を関数「get_start_end_index」「ave_spectrum」で切り分け、平均化処理させた結果を描画しています。

Butterworthフィルタについて

代表的なFIRフィルタで、BPF/LPF/HPFと使い勝手が良いです。
通過帯域は全てを通すフィルタであり、リップル(揺らぎ)が無くいため、除去帯域近くの動きも滑らかです。※高次になれば傾斜は急になります
また、通過帯域は今から説明する他のフィルタと比較して線形な位相応答を示しており、周波数ごとの位相遅れの差もほとんど無いのが特徴です。

以下がサンプルコードとなり、フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力する関数です。
「ftype」数字によってLPF/HPF/BPFを選択できるようになっており、設定結果及び「fl」「fh」を任意の周波数/「order」次数を設定することでフィルタのパラメータを設定します。
関数「character_MA」がフィルタ特性の出力用関数
関数「character_Butter」が実際のフィルタ処理用関数になります。

dt = 0.005 #SamplingTime
fs = 1/dt  #周波数
ftype = 0 #lowpass=0, highpass=1, bandpass=2
fl = None #カットオフ周波数(遮断周波数) 低周波数側
fh = 20 #カットオフ周波数(遮断周波数) 高周波数側
order = 4 #次数

class Butter_Filter():
    def __init__(self, master=None):
        print("Butterworse_Filter")

    def get_filtertype(self, selected_num):
        if selected_num == 0:
            ftype1 = "lowpass"
        elif selected_num == 1:
            ftype1 = "highpass"
        else:
            ftype1 = "bandpass"
        return ftype1

    def get_filterparam(self, dt, fl, fh, order, filtertype):
        dt = float(dt)

        fs = 1 / dt
        fn = fs / 2
        if filtertype == "bandpass":
            Wn = [float(fl)/fn, float(fh)/fn]
            N=order//2
        elif filtertype == "lowpass":
            Wn = float(fh)/fn
            N=order
        else:
            Wn = float(fl)/fn
            N=order
        return Wn, N

    def character_Butter(self, dt, order, fl, fh, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.butter(N, Wn, ftype)#第四引数Trueならばanarog filter/ FalseならばDigital filter
        w, h = signal.freqz(b, a)#アナログフィルタはfreqs,デジタルフィルタはfreqz

        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換
        
        p = np.angle(h) * 180 / np.pi#位相特性

        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def Butter_filter(self, data, dt, order, fl, fh, ftype, worN=8192):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        print(Wn, N)
        b, a = signal.butter(N, Wn, ftype)#第四引数Trueならばanarog filter/ FalseならばDigital filter
        zi = signal.lfilter_zi(b, a)
        z, _ = signal.lfilter(b, a, data, zi=zi*data[0])

        return z

BFilter = Butter_Filter()
ftype = BFilter.get_filtertype(ftype)
print(ftype)
y = BFilter.Butter_filter(x, dt, order, fl, fh, ftype) #信号処理結果
x_freq_rspns_bu, y_freq_rspns_bu, p_bu, x_gd_bu, y_gd_bu = BFilter.character_Butter(dt, order, fl, fh, ftype) #フィルタ特性取得


フィルタ特性について

フィルタ特性確認用のサンプルコードです

# グラフ表示
plt.figure(figsize=(12,6))
# 周波数特性
plt.subplot(3, 1, 1)
plt.plot(x_freq_rspns_bu, y_freq_rspns_bu)
plt.xlim(0, 100)
plt.ylim(-200, 2)  # MATLAB fvtool仕様
plt.ylabel('Amplitude [dB]')
plt.grid()

# 位相特性
plt.subplot(3, 1, 2)
plt.plot(x_freq_rspns_bu, p_bu)
plt.xlim(0, 100)
plt.ylim(-200, 200)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Phase [deg]')
plt.grid()

# 群遅延プロット
plt.subplot(3, 1, 3)
plt.plot(x_gd_bu, y_gd_bu)
plt.xlim(0, 100)
plt.ylim(-5, 15)  # MATLAB fvtool仕様
plt.xlabel('Frequency [Hz]')
plt.ylabel('Group delay [samples]')
plt.grid()
plt.show()

実行結果は以下の通りです。
f:id:toku_dango:20210718133317p:plain
※サンプルコードはローパスフィルタ設定
グラフ1つ目がフィルタの周波数特性
グラフ2つ目がフィルタの位相特性
グラフ3つ目がフィルタの群遅延を示しています。
群遅延は、フィルタ処理によって何サンプル分信号が遅れるのかを示しており、その確認ができます。

フィルタ処理結果の確認

フィルタ処理結果確認用のサンプルコードです。
「x」に任意の信号を入れていただくことで処理ができますので、処理したい信号を「x」に入れて試してみてください。

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))
plt.plot(times, x_s, c="blue", label=("sin"))
plt.plot(times, y, c="green", label=("Butter"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

f_raw, t_raw, Sxx_raw = dfilter.FFT(y, dt)
plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
f_raw, t_raw, Sxx_raw = dfilter.FFT(x, dt)
gain = dfilter.ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="red", label=("sin+noise"))
f_raw, t_raw, Sxx_raw = dfilter.FFT(x_s, dt)
gain = dfilter.ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("sin"))
f_raw, t_raw, Sxx_raw = dfilter.FFT(y, dt)
gain = dfilter.ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="green", label=("Butter"))
plt.legend()

# プロット表示(設定の反映)
plt.show()

f:id:toku_dango:20210718133720p:plain
グラフ1つ目の緑プロットがButterworthフィルタ後の出力となります。※フィルタ処理前が赤プロット
グラフ2つ目がフィルタ後出力の周波数特性を示した時系列です。
グラフ3つ目の緑プロットが移動平均フィルタ後の周波数特性になります。
設計通り、20Hz付近から減衰するようなフィルタ特性を持っており、グラフ3つ目の緑プロットと赤プロットを比較して、設計通りのフィルタが作成できていることが分かります。

Chebyshevフィルタについて

通過帯域でのリップルが存在するが、Butterworseと比較して減衰頻度を大きく取れるフィルタ(第一種Chebyshevフィルタ)
減衰頻度をさらに大きく取れるように設計されたフィルタが第二種Chebyshevフィルタ

第一種Chebyshevフィルタ(フィルタ特性/フィルタ処理)

以下がサンプルコードとなり、フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力する関数です。
「ftype」数字によってLPF/HPF/BPFを選択できるようになっており、設定結果及び「fl」「fh」を任意の周波数/「order」次数を設定することでフィルタのパラメータを設定します。
第一種Chebyshevフィルタでは、「attenuation」でリップルについても調整可能です。

dt = 0.005 #SamplingTime
fs = 1/dt  #周波数
ftype = 0 #lowpass=0, highpass=1, bandpass=2
fl = None #カットオフ周波数(遮断周波数) 低周波数側
fh = 20 #カットオフ周波数(遮断周波数) 高周波数側
order = 4 #次数
attenuation = 5 #リップル(通過域最大損失量[dB])

class Chebyshev_Filter():
    def __init__(self, master=None):
        print("Chebyshev_Filter")

    def get_filtertype(self, selected_num):
        if selected_num == 0:
            ftype1 = "lowpass"
        elif selected_num == 1:
            ftype1 = "highpass"
        else:
            ftype1 = "bandpass"
        return ftype1

    def get_filterparam(self, dt, fl, fh, order, filtertype):
        dt = float(dt)

        fs = 1 / dt
        fn = fs / 2
        if filtertype == "bandpass":
            Wn = [float(fl)/fn, float(fh)/fn]
            N=order//2
        elif filtertype == "lowpass":
            Wn = float(fh)/fn
            N=order
        else:
            Wn = float(fl)/fn
            N=order
        return Wn, N

    def character_Chebyshev(self, dt, order, fl, fh, ripple, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.cheby1(N, ripple, Wn, ftype)
        w, h = signal.freqz(b, a)#アナログフィルタはfreqs,デジタルフィルタはfreqz

        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換
        
        p = np.angle(h) * 180 / np.pi#位相特性

        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def Chebyshev_filter(self, data, dt, order, fl, fh, ripple, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.cheby1(N, ripple, Wn, ftype)#第四引数Trueならばanarog filter/ FalseならばDigital filter

        zi = signal.lfilter_zi(b, a)
        z, _ = signal.lfilter(b, a, data, zi=zi*data[0])

        return z

CFilter = dfilter.Chebyshev_Filter()
ftype = CFilter.get_filtertype(ftype)
print(ftype)
y = CFilter.Chebyshev_filter(x, dt, order, fl, fh, attenuation, ftype)
x_freq_rspns_ch, y_freq_rspns_ch, p_ch, x_gd_ch, y_gd_ch = CFilter.character_Chebyshev(dt, order, fl, fh, ripple, ftype)


フィルタ特性は以下の通りです。
フィルタ次数にもよりますが、Butterworthフィルタと比較して減衰が大きくなっているように見えます。
ただ、位相/群遅延プロットを見ると、周波数によって遅れが変わってきているので、使う際はこの辺を理解したうえで使う必要ありそうです。
f:id:toku_dango:20210718140925p:plain


フィルタ処理結果は以下の通りです。
f:id:toku_dango:20210718141244p:plain
ローパスフィルタとしてうまく機能はしてそうですが、リップルの影響か、sin波の強度が若干低下していますね。
この辺も理解したうえで使う必要ありそうです。

第二種Chebyshevフィルタ(フィルタ特性/フィルタ処理)

第一種Chebyshevフィルタよりも減衰頻度を大きく取れるように設計されたフィルタ
通過帯域にリップルがない代わりに、除去帯域で等リップル性を持つ。
以下がサンプルコードとなり、フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力する関数です。
「ftype」数字によってLPF/HPF/BPFを選択できるようになっており、設定結果及び「fl」「fh」を任意の周波数/「order」次数を設定することでフィルタのパラメータを設定します。
第二種Chebyshevフィルタでは、「attenuation」でリップルについても調整可能です。

dt = 0.005 #SamplingTime
fs = 1/dt  #周波数
ftype = 0 #lowpass=0, highpass=1, bandpass=2
fl = None #カットオフ周波数(遮断周波数) 低周波数側
fh = 20 #カットオフ周波数(遮断周波数) 高周波数側
order = 4 #次数
attenuation = 10 #minimum attenuation(阻止域最小減衰量[dB])

class Chebyshev_second_Filter():
    def __init__(self, master=None):
        print("Chebyshev_Filter")

    def get_filtertype(self, selected_num):
        if selected_num == 0:
            ftype1 = "lowpass"
        elif selected_num == 1:
            ftype1 = "highpass"
        else:
            ftype1 = "bandpass"
        return ftype1

    def get_filterparam(self, dt, fl, fh, order, filtertype):
        dt = float(dt)

        fs = 1 / dt
        fn = fs / 2
        if filtertype == "bandpass":
            Wn = [float(fl)/fn, float(fh)/fn]
            N=order//2
        elif filtertype == "lowpass":
            Wn = float(fh)/fn
            N=order
        else:
            Wn = float(fl)/fn
            N=order
        return Wn, N

    def character_Chebyshev_second(self, dt, order, fl, fh, attenuation, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.cheby2(N, attenuation, Wn, ftype)
        w, h = signal.freqz(b, a)#アナログフィルタはfreqs,デジタルフィルタはfreqz

        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換
        
        p = np.angle(h) * 180 / np.pi#位相特性

        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def Chebyshev_second_filter(self, data, dt, order, fl, fh, attenuation, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.cheby2(N, attenuation, Wn, ftype)

        zi = signal.lfilter_zi(b, a)
        z, _ = signal.lfilter(b, a, data, zi=zi*data[0])

        return z

CsFilter = Chebyshev_second_Filter()
ftype = CsFilter.get_filtertype(ftype)
print(ftype)
y = CsFilter.Chebyshev_second_filter(x+1, dt, order, fl, fh, attenuation, ftype)
x_freq_rspns_ch2, y_freq_rspns_ch2, p_ch2, x_gd_ch2, y_gd_ch2 = CsFilter.character_Chebyshev_second(dt, order, fl, fh, attenuation, ftype)

フィルタ特性は以下の通りです。
f:id:toku_dango:20210718144639p:plain

フィルタ処理結果は以下の通りです。
f:id:toku_dango:20210718142736p:plain


Besselフィルタについて

最後にBesselフィルタについて紹介します。
Butterworthと非常に似た特性を持つフィルタで、過渡応答性がより滑らかなフィルタ。
通過帯域では線形な位相特性を持つため、信号の遅れも周波数によらずある程度一定を保てることができます。
正直言うとButterworthとBesselフィルタとの使い分けはわかりませんでした。もう少し勉強します。

フィルタ特性/フィルタ処理
dt = 0.005 #SamplingTime
fs = 1/dt  #周波数
ftype = 0 #lowpass=0, highpass=1, bandpass=2
fl = None #カットオフ周波数(遮断周波数) 低周波数側
fh = 20 #カットオフ周波数(遮断周波数) 高周波数側
order = 4 #次数

class Bessel_Filter():
    def __init__(self, master=None):
        print("Chebyshev_Filter")

    def get_filtertype(self, selected_num):
        if selected_num == 0:
            ftype1 = "lowpass"
        elif selected_num == 1:
            ftype1 = "highpass"
        else:
            ftype1 = "bandpass"
        return ftype1

    def get_filterparam(self, dt, fl, fh, order, filtertype):
        dt = float(dt)

        fs = 1 / dt
        fn = fs / 2
        if filtertype == "bandpass":
            Wn = [float(fl)/fn, float(fh)/fn]
            N=order//2
        elif filtertype == "lowpass":
            Wn = float(fh)/fn
            N=order
        else:
            Wn = float(fl)/fn
            N=order
        return Wn, N

    def character_Bessel(self, dt, order, fl, fh, ftype):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.bessel(N, Wn, ftype)
        w, h = signal.freqz(b, a)#アナログフィルタはfreqs,デジタルフィルタはfreqz

        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換

        p = np.angle(h) * 180 / np.pi#位相特性

        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def Bessel_filter(self, data, dt, order, fl, fh, ftype, worN=8192):
        fs = 1 / dt
        fn = fs / 2# ナイキスト周波数計算
        Wn, N = self.get_filterparam(dt, fl, fh, order, ftype)
        b, a = signal.bessel(N, Wn, ftype)#第四引数Trueならばanarog filter/ FalseならばDigital filter

        zi = signal.lfilter_zi(b, a)
        z, _ = signal.lfilter(b, a, data, zi=zi*data[0])

        return z

BeFilter = Bessel_Filter()
ftype = BeFilter.get_filtertype(ftype)
print(ftype)
y = BeFilter.Bessel_filter(x, dt, order, fl, fh, ftype)
x_freq_rspns_be, y_freq_rspns_be, p_be, x_gd_be, y_gd_be = BeFilter.character_Bessel(dt, order, fl, fh, ftype)



フィルタ特性は以下の通りです。
f:id:toku_dango:20210718143350p:plain
確かにButterworthフィルタと比較して減衰域が滑らかになっているように見えますね。ただ、通過域と減衰域の境が滑らかになっており攻めたフィルタをしたときに境目付近の信号は減衰されてしまう可能性がありそうですね。


フィルタ処理結果は以下の通りです。
f:id:toku_dango:20210718143405p:plain

まとめ

今回はScipyを使ったディジタルフィルタの設計・評価で使えるコードを紹介しました。
正直位相特性も線形なButterworthが一番汎用性高いのではと思ってしまうのですが、使い分けはもう少し勉強します。

Python:SciPy窓関数の作成

時系列データの周波数解析を実施する際のフーリエ変換で使用する窓関数の処理についてまとめます。

開発環境について

$wmic os get caption
>Microsoft Windows 10 Pro

$wmic os get osarchitecture
>64ビット

$python -V
Python 3.7.6

$pip list -V
scipy 1.4.1


目次


窓関数の役割

測定データはある程度のデータ長を持っており、フーリエ変換するデータを切り出す役割を示します。
また、以下に示すような計測データを周期拡張(コピーして繰り返す)した場合に、本来含まれない波長の波が結果に含まれないようにする役割も示します。

計測データと切り出し区間(青枠)
f:id:toku_dango:20210622193006p:plain

周期拡張した計測データ
f:id:toku_dango:20210622193213p:plain
切り出した区間によっては、上記のように信号がつながらなくなり、本来含まれない波長の波が結果に乗ってきてしまう。

サンプルプログラム

SciPyライブラリには多くの窓関数が用意されているので、指定の窓関数を作成できます。
以下サンプルプログラムは、代表的な窓関数であるハニング窓, ハミング窓, ブラックマン窓を示します。

from scipy import signal
from scipy.fftpack import fft, fftshift
import matplotlib.pyplot as plt
import numpy as np

N = 256            # サンプル数
dt = 0.01          # サンプリング間隔
t = np.arange(0, N * dt, dt)  # 時間軸

# 窓関数の一例
fw1 = signal.hann(N)             # ハニング窓
fw2 = signal.hamming(N)          # ハミング窓
fw3 = signal.blackman(N)         # ブラックマン窓

# 時間信号
fig = plt.figure(figsize=(12.0, 5.0))
plt.plot(t, fw1, label='hann')
plt.plot(t, fw2, label='hamming')
plt.plot(t, fw3, label='blackman')
plt.xlabel("data number", fontsize=12)
plt.ylabel("Amplitude", fontsize=12)
plt.grid()
leg = plt.legend(loc=1, fontsize=15)

#周波数特性
num = 0
window = [fw1, fw2, fw3]
fig = plt.figure(figsize=(12.0, 5.0))
for i in window:
    A = fft(i, 2048) / (len(i)/2.0)
    freq = np.linspace(-0.5, 0.5, len(A))
    response = 20 * np.log10(np.abs(fftshift(A / abs(A).max())))
    if num == 0:
        plt.plot(freq, response, label='hann', linewidth = 0.5)
    elif num == 1:
        plt.plot(freq, response, label='hamming', linewidth = 0.5)  
    else:
        plt.plot(freq, response, label='blackman', linewidth = 0.5)
    num += 1
    
plt.axis([-0.5, 0.5, -120, 0])
plt.title("Frequency response of window")
plt.ylabel("Normalized magnitude [dB]")
plt.xlabel("Normalized frequency [cycles per sample]")
plt.grid()
leg = plt.legend(loc=1, fontsize=15)


実行結果
f:id:toku_dango:20210622200039p:plain
グラフ1つ目が窓関数、
グラフ2つ目が窓関数の周波数特性を示しています。
どの窓関数とも、データ軍の両端は0に近いため、窓関数を掛けたデータの両端が0近くなり、周期拡張した時に信号がつながらなくなるという問題がなくなります。
ちなみにですが、フーリエ変換で使用するScipyモジュール (scipy.signal.stft)はデフォルトの窓関数がハニング窓になっているようです。

まとめ

今回はScipyを使った窓関数作成方法を紹介しました。
この辺は特に気にせずフーリエ変換、周波数特性を確認していたので、役割含めて良い勉強になりました。

Python:ディジタルフィルタの設計① 移動平均フィルタ

Pythonから利用できる数学処理ライブラリ、Scipyを使ったディジタルフィルタの設計・評価方法をまとめました。
と言っても数学的な話やディジタルフィルタについての概要は飛ばし、フィルタ設計・評価に使えるサンプルプログラムを載せただけですが、何かに使えれば幸いです。

開発環境について

$wmic os get caption
>Microsoft Windows 10 Pro

$wmic os get osarchitecture
>64ビット

$python -V
Python 3.7.6

$pip list -V
scipy 1.4.1


目次


サンプルプログラム

代表的なFIRフィルタである以下フィルタの設計・評価のためのコードをまとめており、フィルタの周波数特性や位相特性、群遅延プロット、信号のフィルタ処理出力ができます。
※本記事ではまず移動平均フィルタのみを紹介

  1. 移動平均フィルタ
  2. ButterWorseフィルタ
  3. 第一種Chebyshevフィルタ
  4. 第二種Chebyshevフィルタ
  5. Besselフィルタ


紹介するデジタルフィルタ出力例
f:id:toku_dango:20210612215537p:plain
※サンプリング 0.005sec
グラフ1つ目がフィルタの周波数特性
グラフ2つ目がフィルタの位相特性

評価用のサンプル信号について

フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力するコードです。
サイン波にノイズを足したものをサンプル信号とします。

dt = 0.005 #1stepの時間[sec]
fs = 1/dt
times  =  np.arange(0,10,dt)
N = times.shape[0]

f  = 5  #サイン波の周波数[Hz]
sigma  = 0.5 #ノイズの分散

np.random.seed(1)
# サイン波
x_s =np.sin(2 * np.pi * times * f) 
x = x_s  +  sigma * np.random.randn(N)

x_np = np.array(x)

# サイン波+ノイズ
y_s =  np.zeros(times.shape[0])
y_s[:times.shape[0]//2] = 1
y = y_s  +  sigma * np.random.randn(N)

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

def FFT(Raw_Data, sampling_time):
    sampling_time_f = 1/sampling_time
    fft_nseg = 256
    fft_overlap = fft_nseg // 2
    f_raw, t_raw, Sxx_raw = signal.stft(Raw_Data, sampling_time_f, nperseg=fft_nseg, noverlap=fft_overlap)

    return f_raw, t_raw, Sxx_raw

def get_start_end_index(t, start_t, end_t):
    start_ind=0
    while(t[start_ind] < start_t):
        start_ind += 1
    end_ind = start_ind + 1
    while (t[end_ind] < end_t):
        end_ind += 1
    return start_ind, end_ind

def ave_spectrum(Sxx, t, start_t, end_t):
    start_ind, end_ind = get_start_end_index(t, start_t, end_t)
    ave_spectrum = np.zeros(Sxx.shape[0])
    for i in range(Sxx.shape[0]):
        ave_spectrum[i] = np.average(Sxx[i, start_ind:end_ind])
    return ave_spectrum

f_raw, t_raw, Sxx_raw = FFT(x_np, dt)

plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("normal"))
plt.legend()

# プロット表示(設定の反映)
plt.show()


実行結果は以下の通りです。
f:id:toku_dango:20210612231052p:plain
グラフ1つ目がサンプル信号の時系列です
グラフ2つ目がサンプル信号の周波数特性を示した時系列です。このグラフは関数「FFT」の出力結果で、横軸:時間 縦軸:周波数 色が出力(db)を示しており、周波数特性を時系列的に見たい場合に有効です。この画像だと5Hz付近で赤色となっており、グラフ3つ目の周波数特性とも合っていることが分かります。
グラフ3つ目がサンプル信号の周波数特性です。「start」「end」に時間を入力し、その時間範囲のデータでの周波数特性を示します。関数「FFT」の結果を関数「get_start_end_index」「ave_spectrum」で切り分け、平均化処理させた結果を描画しています。

移動平均フィルタについて

移動平均フィルタは、ローパスフィルタとして活用されます。
以下がサンプルコードとなり、フィルタ評価に使用するサンプル信号とサンプル信号の周波数特性を出力する関数です。
「move_num」移動平均個数であり、この値を変えていただくと、任意のフィルタについて評価できます。
「dt」サンプリングタイムであり、今回はサンプル信号に合わせています。
関数「character_MA」がフィルタ特性の出力用関数
関数「MA_filter」が実際のフィルタ処理用関数になります。

move_num = 5 #移動平均個数設定
dt = 0.005 #SamplingTime
fs = 1/dt  #周波数

class MA_Filter():
    def __init__(self, master=None):
        print("MA_Filter")

    def character_MA(self, N, fs, a=1):
        b = np.ones(N) / N
        fn = fs / 2# ナイキスト周波数計算

        w, h = signal.freqz(b, a)# 周波数応答計算
        x_freq_rspns = w / np.pi * fn
        y_freq_rspns = db(abs(h))#複素数をデシベル変換

        p = np.angle(h) * 180 / np.pi#位相特性
        w, gd = signal.group_delay([b, a])# 群遅延計算
        x_gd = w / np.pi * fn
        y_gd = gd

        return x_freq_rspns, y_freq_rspns, p, x_gd, y_gd

    def MA_filter(self, x, move_num):
        move_ave = []
        ring_buff = np.zeros(move_num)
        for i in range(len(x)):
            num = i % move_num
            if i < move_num:
                ring_buff[num] = x[i]
                move_ave.append(0)
            else:
                ring_buff[num] = x[i]
                ave = np.mean(ring_buff)
                move_ave.append(ave)
        return move_ave

MFilter = MA_Filter()

フィルタ特性について

フィルタ特性確認用のサンプルコードです。

x_freq_rspns_ave, y_freq_rspns_ave, p_ave, x_gd_ave, y_gd_ave = MFilter.character_MA(move_num, fs, a=1)

# グラフ表示
plt.figure(figsize=(12,6))
# 周波数特性
plt.subplot(3, 1, 1)
plt.plot(x_freq_rspns_ave, y_freq_rspns_ave)
plt.xlim(0, 100)
plt.ylim(-200, 2)
plt.ylabel('Amplitude [dB]')
plt.grid()

# 位相特性
plt.subplot(3, 1, 2)
plt.plot(x_freq_rspns_ave, p_ave)
plt.xlim(0, 100)
plt.ylim(-200, 200)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Phase [deg]')
plt.grid()

# 群遅延プロット
plt.subplot(3, 1, 3)
plt.plot(x_gd_ave, y_gd_ave)
plt.xlim(0, 100)
plt.ylim(-5, 15)
plt.xlabel('Frequency [Hz]')
plt.ylabel('Group delay [samples]')
plt.grid()
plt.show()


実行結果は以下の通りです。
f:id:toku_dango:20210612235805p:plain
グラフ1つ目がフィルタの周波数特性
グラフ2つ目がフィルタの位相特性
グラフ3つ目がフィルタの群遅延を示しています。
群遅延は、フィルタ処理によって何サンプル分信号が遅れるのかを示しており、その確認ができます。

フィルタ処理結果の確認

フィルタ処理結果確認用のサンプルコードです。
「x」に任意の信号を入れていただくことで処理ができますので、処理したい信号を「x」に入れて試してみてください。

moveave_x = MFilter.MA_filter(x, move_num)

plt.figure(figsize=(12,6))
plt.subplot(3,1,1)
plt.plot(times, x, c="red", label=("sin+noise"))
plt.plot(times, x_s, c="blue", label=("sin"))
plt.plot(times, moveave_x, c="green", label=("MA"))

#軸調整
plt.xlim(0,1)
plt.yticks([-2, -1.5, -1, -0.5, 0, 0.5,1, 1.5, 2])
plt.grid(which = "major", axis = "y", color = "gray", alpha = 0.8,
        linestyle = "--", linewidth = 1)
plt.legend()

f_raw, t_raw, Sxx_raw = FFT(x_np, dt)
plt.subplot(3,1,2)
plt.pcolormesh(t_raw, f_raw, np.abs(Sxx_raw),vmin=0, vmax=0.4, cmap="jet")

plt.subplot(3,1,3)
start = 0
end = 2
f_raw, t_raw, Sxx_raw = FFT(x, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="red", label=("sin+noise"))
f_raw, t_raw, Sxx_raw = FFT(x_s, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="blue", label=("sin"))
f_raw, t_raw, Sxx_raw = FFT(moveave_x, dt)
gain = ave_spectrum(np.abs(Sxx_raw), t_raw, start, end)
plt.plot(f_raw, gain, c="green", label=("MA"))
plt.legend()

# プロット表示(設定の反映)
plt.show()


実行結果は以下の通りです。
f:id:toku_dango:20210613000011p:plain
グラフ1つ目の緑プロットが移動平均フィルタ後の出力となります。※フィルタ処理前が赤プロット
グラフ2つ目がフィルタ後出力の周波数特性を示した時系列です。
グラフ3つ目の緑プロットが移動平均フィルタ後の周波数特性になります。
今回作成した移動平均フィルタのフィルタ特性を見ると、20Hz付近から減衰するようなフィルタ特性を持っており、グラフ3つ目の緑プロットと赤プロットを比較して、設計通りのフィルタが作成できていることが分かります。

まとめ

今回はScipyを使ったディジタルフィルタの設計・評価で使えるコードを紹介しました。
フィルタ特性を確認と評価が合わせてできるように作成したので、使えるケースもあるかもしれません。
FIRの別のフィルタについても書いていく予定です。

Rasberry PiとSORACOMサービスとで異常検知システムを作成 2/2 (機能の実装編)

f:id:toku_dango:20210515193032p:plain
続いては、「機能の実装」 についてまとめていきます。

目次


実施したいこと(再掲)

作業者の異常を検知して、管理者に通知するシステムの作成が目標となるため、以下機能の実現を目指しました。

機能①:作業者(人)を認識して、異常を検知する
機能②:異常を検知したら、その旨をスマートスピーカーで管理者に通知する
機能③:異常を検知したら、状況の画像と位置情報をLINEで管理者に通知する
機能④:異常を検知したら、状況の画像と位置情報をSORACOM Lagoonダッシュボードで表示
おまけ機能:新型コロナ対策 密集検知機能(規定人数以上の人が会議室内にいる場合にその旨を管理者に通知する)

人の認識機能を有した市販の監視カメラは最近出てきましたが、まだまだ高価ですね。また、その中でも異常を検知する機能を有したものもそこまで多くないと思います。

今回SORACOMサービスを使用した理由としては、Wi-Fiが届かない様々な場所にも設置することを想定しているからです。
SORACOMではRaspberry Pi等に接続し、セルラー通信が可能となるUSBドングルがあるので、今回はそれを使用しています。

下記に本システムの概要図を示します。

f:id:toku_dango:20210508152246p:plain
システム概要図

機能の実装に向けて

  1. システム概要とセットアップ
  2. 機能の実装 (本ページ)


1. 異常検知の実装

異常検知は、人物検知と動体検知を組み合わせて実施します。
人物検知は、前回説明した物体検知の機能を使います。
動体検知は、動いている物体を検知することで、時刻違いの画像(前画像と現画像)の差分を求めることで検知します。
今回は、作業員が単独作業中に動けなくなることを異常状態と定義し、人物検知による作業員数を把握/動体検知によって、異常状態を判断します。
人間の姿勢推定して倒れているかどうかを判断する等、他にも良い方法もありそうなのですが、今回はプロトタイプとしてこの方法とします。

f:id:toku_dango:20210516162037p:plain

機能追加のためコード修正は、「Object Detection Tools」の「object_detection.py」をベースとして修正していきます。
コードが長いため、完全なソースコードここ「employee_anomaly_detection.py」に示します。「object_detection.py」から追加した機能についての説明を以下にまとめています。
抜粋した箇所(機能として働く関数箇所)の説明になるため、コードを試す場合は上記コードをダウンロードして実行することをお勧めします。

コードをダウンロードして実行をする場合は以下の通り実行ください。

$ cd ~\EmployeeAnomalyDetection\ObjectDetection\scripts
$ python3 employee_anomaly_detection.py#実行用コード

実行後に以下の通りウィンドウが現れ、異常検知(Emergency_Judge)と密集検知(Capacity_Judge)のカウンターが表示されます。
Tensor flowに関連した警告が出るかもしれませんが、実行上は問題がないためとりあえずは無視しています
f:id:toku_dango:20210516153819p:plain

1.1 人物検知について

人物検知は「Object Detection Tools」物体検知の機能を使わせていただくので、
機能自体に修正はないのですが、「employee_anomaly_detection.py」では人数をカウントする機能を追加します。
object_detection.py内の182行目~187行で検出した物体名を出力しており、
検出した物の名前をlistに格納させます。
181行と188行、216行辺りに以下の通りコードを追加。

      # convert bgr to rgb
      image_np = img_bgr[:,:,::-1]
      image_np_expanded = np.expand_dims(image_np, axis=0)
      start = time.time()
      output_dict = run_inference_for_single_image(image_np_expanded, detection_graph)
      elapsed_time = time.time() - start
        
      befDetectList = [] #<<<<<追加する 検出した物体名を格納するリストの定義
      personNum = 0 #<<<<<追加する 検出人数を0で置く

      for i in range(output_dict['num_detections']):
        class_id = output_dict['detection_classes'][i]
        if class_id < len(labels):
          label = labels[class_id]
        else:
          label = 'unknown'
          
        befDetectList.append(label)#<<<<<追加する 検出した物体名をリストに格納

        detection_score = output_dict['detection_scores'][i]

        if detection_score > 0.5:
            # Define bounding box
            h, w, c = img.shape
            box = output_dict['detection_boxes'][i] * np.array( \
              [h, w,  h, w])
            box = box.astype(np.int)

            speed_info = '%s: %.3f' % ('fps', 1.0/elapsed_time)
            cv2.putText(img, speed_info, (10,50), \
              cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)

            if mode == 'bbox':
              class_id = class_id % len(colors)
              color = colors[class_id]

              # Draw bounding box
              cv2.rectangle(img, \
                (box[1], box[0]), (box[3], box[2]), color, 3)

              # Put label near bounding box
              information = '%s: %.1f%%' % (label, output_dict['detection_scores'][i] * 100.0)
              cv2.putText(img, information, (box[1] + 15, box[2] - 15), \
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 1, cv2.LINE_AA)
            elif mode == 'mosaic':
              img = mosaic_area(img, box[1], box[0], box[3], box[2], ratio=0.05)

            if label ==  "person": #<<<<<追加する
              personNum += 1  #<<<<<追加する 検出した人物数を数える

1.2 動体検知について

OpencCVモジュールの関数を使用しています。
動体検知までの基本的な流れは、以下の通りです。

  1. 画像をグレースケールに変換
  2. 時刻違いの画像(前画像と現画像)の差分を計算
  3. 上記差分値としきい値とを比較して動体を認識(検知)

処理の詳細は、以下で詳しく解説されています。
developers.cyberagent.co.jp

「employee_anomaly_detection.py」の動体検知に関するコードは以下の通りです。

moveThreshold=1000 #<<<<<<動体検知 しきい値
avg=None

def moveDetect(img, avg):
    moveFlg = 0
    #画像をグレースケールに変換
    grayImg=cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    #前回画像がない場合の処理
    if avg is None:
        avg = grayImg.copy().astype("float")
        return avg, moveFlg

    #前画像との差分を取得する
    cv2.accumulateWeighted(grayImg, avg, 0.00001)
    delta = cv2.absdiff(grayImg, cv2.convertScaleAbs(avg))
    thresh = cv2.threshold(delta, 50, 255, cv2.THRESH_BINARY)[1]
    contours, h = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)  

    #画像内 差分箇所のうち最大箇所を抽出
    max_area=0
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if max_area < area:
            max_area = area

    #動体判定
    if max_area > moveThreshold:
        moveFlg = 1

    #今回取得画像を保存
    avg = grayImg.copy().astype("float")

    return avg, moveFlg

変数moveThreshold が動体検知のしきい値であり、動体検知がうまくいかない場合は
必要に応じて変更ください。

1.3 異常検知について

異常検知までの流れは、以下の通りです。

  1. 人物検知にて画面に映る人数が1名の場合(1.1 人物検知の変数 personNum==1)、かつ動体検知機能で動体が映らない時(1.2 動体検知の変数 moveFlg = 0)に異常検知カウンターを上げる。
  2. 異常検知カウンターがしきい値を超える、かつ最初にカウンターを上げた時から規定時間以上 時間が経っている場合に異常検知フラグを立てる


「employee_anomaly_detection.py」異常検知に関するコードは以下の通りです。
今回は、異常検知カウンターが60以上でかつ3分以上時間が経っている場合に異常検知フラグが立つように設定しています。

import datetime #<<<<<<モジュールのimportが必要です

#emergencyJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

MovBefTimes=[0,0,0,0,0,0]
MovJudgeCounter = 0 #<<<<<<異常検知カウンター
MovResetTime=30 #<<<<<<異常検知カウンター リセット時間
MovAlertThreshold = 60 #<<<<<<異常検知 カウンター しきい値
MovAlertWaitTime = 180 #<<<<<<通知待機時間設定
#画像の格納パス
pictpath='/home/pi/Desktop/CheckView/ObjectDetection/picts' #<<<<<<画像を保存するパス

#現在日付を取得
nowstr=datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
nowTime=time.time()

def emergencyJudge(img, nowTime, MovJudgeCounter):

    if MovJudgeCounter == 0:
        MovJudgeCounter = 1
        #初回検知時間を保存
        MovBefTimes[0]=int(nowTime)
    else:
        if int(nowTime) - MovBefTimes[1] < MovResetTime:
             MovJudgeCounter += 1
        else:
            MovJudgeCounter = 0
    print("MoveCounter=", MovJudgeCounter)
    #検知時間を保存
    MovBefTimes[1]=int(nowTime)
        
    #しきい値及び経過時間が規定以上になった場合に通知
    if MovJudgeCounter >= MovAlertThreshold and int(nowTime) - MovBefTimes[0] > MovAlertWaitTime:
        if int(nowTime) - MovBefTimes[2] > interval:
            #通知時間を保存
            MovBefTimes[1]=int(nowTime)
            #ファイルに保存 
            filename=pictpath+'/'+nowstr+'_EmergencyCall.png' #<<<<<<画像を保存する
            cv2.imwrite(filename, img)
            """通知機能を使う場合はコメントアウトを外す
            messageLine = "従業員異常通知_会議室" + BUILDING_NAME + ROOM_NUM
            messageAlexa = BUILDING_NAME + ROOM_NUM + "従業員の異常を確認しました。状況を確認してください。"
            lineMessage(filename, messageLine)
            alexaMessage(messageAlexa)
            sendSoracom(filename)
            """
                
    return MovJudgeCounter


1.4 おまけ機能について

おまけ機能は、新型コロナ対策 密集検知機能です。
会議室にカメラを設置し、規定人数以上の人が会議室内にいる場合にその旨を管理者に通知します。

密集検知までの流れは、以下の通りです。

  1. 人物検知にて会議室内の人数をカウント(1.1 人物検知の変数 personNumに対応)
  2. 上記を会議室内の人数として、規定人数以上場合に密集検知フラグを立てる


「employee_anomaly_detection.py」の密集検知用の関数は以下の通りです。
今回は規定人数を3人としています (CAPACITY_NUM = 3)

import datetime #<<<<<<モジュールのimportが必要です
#capacityJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

CaBefTimes=[0,0,0,0,0,0]
CAPACITY_NUM = 3 #<<<<<<密集検知 しきい値 (3人以上で密集検知カウンターを上げる)
CaJudgeCounter = 0 #<<<<<<密集検知 カウンター
CaResetTime=20 #<<<<<<密集検知カウンター リセット時間
CaAlertThreshold = 60 #<<<<<<密集検知 カウンター しきい値

def capacityJudge(personNum, img, nowTime, CaJudgeCounter):
    if personNum >= (CAPACITY_NUM / 2):
        
        if CaJudgeCounter == 0:
            CaJudgeCounter = 1
        else:
            if int(nowTime) - CaBefTimes[0] < CaResetTime:
                CaJudgeCounter += 1
            else:
                CaJudgeCounter = 0
        #検知時間を保存
        CaBefTimes[0]=int(nowTime)
        print("CapacityCounter=", CaJudgeCounter)
            
        #通知(一定時間間隔)
        if CaJudgeCounter >= CaAlertThreshold:
            if int(nowTime) - CaBefTimes[1] > interval:
                #通知時間を保存
                CaBefTimes[1]=int(nowTime)
                #ファイルに保存 
                filename=pictpath+'/'+nowstr+'MaxCapacity.png'#<<<<<<画像を保存する
                cv2.imwrite(filename, img)
                """通知機能を使う場合はコメントアウトを外す
                messageLine = "密集防止アラート_会議室" + BUILDING_NAME + ROOM_NUM
                messageAlexa = BUILDING_NAME + ROOM_NUM + "会議室の収容人数を超過しています。注意喚起を実施ください。"
                lineMessage(filename, messageLine)
                alexaMessage(messageAlexa)
                sendSoracom(filename)
                """

    return CaJudgeCounter


2. 通知機能の実装

通知機能は、以下3種類で構成されており、異常検知フラグ及び密集検知フラグが立った場合に動かします。

  • Amazon Echoで音声通知
  • LINEで状況の画像と位置情報を通知
  • SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

2.1 Amazon Echoで音声通知

Amazon Echoを喋らせる方法は、以前の記事にまとめており、「alexa_remote_control」の機能を使います。スマートスピーカーAPIを使えば、非常に簡単に音声通知ができますね。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。
※以下に示すコードの他、同じパスに置く「alexa-remote_control.sh」ファイル内の設定も必要です。

import subprocess #<<<<<<モジュールのimportが必要です
import requests #<<<<<<モジュールのimportが必要です

def alexaMessage(message):
    message = "speak:" + message
    cmd = ["./alexa_remote_control.sh", "-e", message] #messageに通知したい文字を入れる
    res = subprocess.call(cmd)


2.2 LINEで状況の画像と位置情報を通知

LINEで通知する方法も、以前の記事にまとめており、こちらは「LINE Notify」の機能を使います。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def lineMessage(fname, message):
    url = "https://notify-api.line.me/api/notify"
    token = [""] #ここにLINE Notifyのトークンを入力
    for i in token:
        headers = {"Authorization" : "Bearer "+ i}
        payload = {"message" :  message} 
        files = {"imageFile": open(fname, "rb")}
        r = requests.post(url, headers = headers, params=payload, files=files)
        print(r.text)


通知結果
異常検知後に画像とメッセージがLINEに送られます。
※密集検知後も同様に画像とメッセージがLINEに送られます。
f:id:toku_dango:20210515214900p:plain

2.3 SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

SORACOM Lagoonダッシュボードで、状況が確認できるように設定します。
具体的には、以下のように設定をしました。
f:id:toku_dango:20210515224106p:plain
異常検知、密集検知後に画像と位置情報がSORACOM側に送られ、ダッシュボードで表示されます。
※位置情報は、サンプルとして大阪城に設定しています。

2.3.1 ダッシュボードの設定

ダッシュボードは、SORACOM LagoonのUIを使って設定します。
簡単に設定できますので、公式ページ(以下URL)を参考にして下さい。
users.soracom.io

画像ファイルを SORACOM Lagoonで表示させる方法も別ページで詳しく説明されています。
Getting Started: Harvest Files の画像を SORACOM Lagoon で表示する | SORACOM Harvest | SORACOM Users

2.3.2 SORACOM Harbest Data/Harvest Filesにファイルやデータを送る

SORACOM Lagoon ダッシュボードでファイルやデータを表示させます。
今回は、以下ファイルとデータを送ります。

ファイル: 画像ファイル
データ: 緯度経度情報, 部屋番号


「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def sendSoracom(filename):
    send_imageFile ='curl -v -X PUT --data-binary @'  + filename + ' -H content-type:images/png http://harvest-files.soracom.io/lagoon/harvest.png' #<<<<<<画像ファイルを送る
    send_message ='curl -v -X POST -H content-type:application/json -d {\"latitude\":34.6872318,\"longitude\":135.5259173,\"room_num\":' + ROOM_NUM + '} http://harvest.soracom.io' #<<<<<<緯度経度/部屋番号情報を送る(緯度経度は、サンプルとして大阪城の位置情報となっています)
    res = subprocess.call(send_message.split())
    res = subprocess.call(send_imageFile.split())


おわりに

今回は、人物検知と動体検知機能と組み合わせることで、
異常・密集判定し、3種類の方法で通知ができるようにしました。
異常検知に際して、カメラを作業現場に固定しておくことができるため
「装置を装着する等の作業員の追加作業が必要ない」という目標は、達成できたのではないかと思います。
ただ、「異常を正しく検知する」に関しては、まだまだ改善の余地がありそうなので、今後の課題と考えています。
機能の実装については、部分的な説明で大変申し訳ありませんが、試しに使ってみたい方は「employee_anomaly_detection.py」を使って試してください。

物体検知機能を使用すると、他にも色々なことができそうなので、今後も応用方法を考えて実行したいと思います。