まるっとワーク

統計・機械学習・電子工作など気になることを残していきます

Rasberry PiとSORACOMサービスとで異常検知システムを作成 2/2 (機能の実装編)

f:id:toku_dango:20210515193032p:plain
続いては、「機能の実装」 についてまとめていきます。

目次


実施したいこと(再掲)

作業者の異常を検知して、管理者に通知するシステムの作成が目標となるため、以下機能の実現を目指しました。

機能①:作業者(人)を認識して、異常を検知する
機能②:異常を検知したら、その旨をスマートスピーカーで管理者に通知する
機能③:異常を検知したら、状況の画像と位置情報をLINEで管理者に通知する
機能④:異常を検知したら、状況の画像と位置情報をSORACOM Lagoonダッシュボードで表示
おまけ機能:新型コロナ対策 密集検知機能(規定人数以上の人が会議室内にいる場合にその旨を管理者に通知する)

人の認識機能を有した市販の監視カメラは最近出てきましたが、まだまだ高価ですね。また、その中でも異常を検知する機能を有したものもそこまで多くないと思います。

今回SORACOMサービスを使用した理由としては、Wi-Fiが届かない様々な場所にも設置することを想定しているからです。
SORACOMではRaspberry Pi等に接続し、セルラー通信が可能となるUSBドングルがあるので、今回はそれを使用しています。

下記に本システムの概要図を示します。

f:id:toku_dango:20210508152246p:plain
システム概要図

機能の実装に向けて

  1. システム概要とセットアップ
  2. 機能の実装 (本ページ)


1. 異常検知の実装

異常検知は、人物検知と動体検知を組み合わせて実施します。
人物検知は、前回説明した物体検知の機能を使います。
動体検知は、動いている物体を検知することで、時刻違いの画像(前画像と現画像)の差分を求めることで検知します。
今回は、作業員が単独作業中に動けなくなることを異常状態と定義し、人物検知による作業員数を把握/動体検知によって、異常状態を判断します。
人間の姿勢推定して倒れているかどうかを判断する等、他にも良い方法もありそうなのですが、今回はプロトタイプとしてこの方法とします。

f:id:toku_dango:20210516162037p:plain

機能追加のためコード修正は、「Object Detection Tools」の「object_detection.py」をベースとして修正していきます。
コードが長いため、完全なソースコードここ「employee_anomaly_detection.py」に示します。「object_detection.py」から追加した機能についての説明を以下にまとめています。
抜粋した箇所(機能として働く関数箇所)の説明になるため、コードを試す場合は上記コードをダウンロードして実行することをお勧めします。

コードをダウンロードして実行をする場合は以下の通り実行ください。

$ cd ~\EmployeeAnomalyDetection\ObjectDetection\scripts
$ python3 employee_anomaly_detection.py#実行用コード

実行後に以下の通りウィンドウが現れ、異常検知(Emergency_Judge)と密集検知(Capacity_Judge)のカウンターが表示されます。
Tensor flowに関連した警告が出るかもしれませんが、実行上は問題がないためとりあえずは無視しています
f:id:toku_dango:20210516153819p:plain

1.1 人物検知について

人物検知は「Object Detection Tools」物体検知の機能を使わせていただくので、
機能自体に修正はないのですが、「employee_anomaly_detection.py」では人数をカウントする機能を追加します。
object_detection.py内の182行目~187行で検出した物体名を出力しており、
検出した物の名前をlistに格納させます。
181行と188行、216行辺りに以下の通りコードを追加。

      # convert bgr to rgb
      image_np = img_bgr[:,:,::-1]
      image_np_expanded = np.expand_dims(image_np, axis=0)
      start = time.time()
      output_dict = run_inference_for_single_image(image_np_expanded, detection_graph)
      elapsed_time = time.time() - start
        
      befDetectList = [] #<<<<<追加する 検出した物体名を格納するリストの定義
      personNum = 0 #<<<<<追加する 検出人数を0で置く

      for i in range(output_dict['num_detections']):
        class_id = output_dict['detection_classes'][i]
        if class_id < len(labels):
          label = labels[class_id]
        else:
          label = 'unknown'
          
        befDetectList.append(label)#<<<<<追加する 検出した物体名をリストに格納

        detection_score = output_dict['detection_scores'][i]

        if detection_score > 0.5:
            # Define bounding box
            h, w, c = img.shape
            box = output_dict['detection_boxes'][i] * np.array( \
              [h, w,  h, w])
            box = box.astype(np.int)

            speed_info = '%s: %.3f' % ('fps', 1.0/elapsed_time)
            cv2.putText(img, speed_info, (10,50), \
              cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)

            if mode == 'bbox':
              class_id = class_id % len(colors)
              color = colors[class_id]

              # Draw bounding box
              cv2.rectangle(img, \
                (box[1], box[0]), (box[3], box[2]), color, 3)

              # Put label near bounding box
              information = '%s: %.1f%%' % (label, output_dict['detection_scores'][i] * 100.0)
              cv2.putText(img, information, (box[1] + 15, box[2] - 15), \
                cv2.FONT_HERSHEY_SIMPLEX, 1, color, 1, cv2.LINE_AA)
            elif mode == 'mosaic':
              img = mosaic_area(img, box[1], box[0], box[3], box[2], ratio=0.05)

            if label ==  "person": #<<<<<追加する
              personNum += 1  #<<<<<追加する 検出した人物数を数える

1.2 動体検知について

OpencCVモジュールの関数を使用しています。
動体検知までの基本的な流れは、以下の通りです。

  1. 画像をグレースケールに変換
  2. 時刻違いの画像(前画像と現画像)の差分を計算
  3. 上記差分値としきい値とを比較して動体を認識(検知)

処理の詳細は、以下で詳しく解説されています。
developers.cyberagent.co.jp

「employee_anomaly_detection.py」の動体検知に関するコードは以下の通りです。

moveThreshold=1000 #<<<<<<動体検知 しきい値
avg=None

def moveDetect(img, avg):
    moveFlg = 0
    #画像をグレースケールに変換
    grayImg=cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    #前回画像がない場合の処理
    if avg is None:
        avg = grayImg.copy().astype("float")
        return avg, moveFlg

    #前画像との差分を取得する
    cv2.accumulateWeighted(grayImg, avg, 0.00001)
    delta = cv2.absdiff(grayImg, cv2.convertScaleAbs(avg))
    thresh = cv2.threshold(delta, 50, 255, cv2.THRESH_BINARY)[1]
    contours, h = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)  

    #画像内 差分箇所のうち最大箇所を抽出
    max_area=0
    for cnt in contours:
        area = cv2.contourArea(cnt)
        if max_area < area:
            max_area = area

    #動体判定
    if max_area > moveThreshold:
        moveFlg = 1

    #今回取得画像を保存
    avg = grayImg.copy().astype("float")

    return avg, moveFlg

変数moveThreshold が動体検知のしきい値であり、動体検知がうまくいかない場合は
必要に応じて変更ください。

1.3 異常検知について

異常検知までの流れは、以下の通りです。

  1. 人物検知にて画面に映る人数が1名の場合(1.1 人物検知の変数 personNum==1)、かつ動体検知機能で動体が映らない時(1.2 動体検知の変数 moveFlg = 0)に異常検知カウンターを上げる。
  2. 異常検知カウンターがしきい値を超える、かつ最初にカウンターを上げた時から規定時間以上 時間が経っている場合に異常検知フラグを立てる


「employee_anomaly_detection.py」異常検知に関するコードは以下の通りです。
今回は、異常検知カウンターが60以上でかつ3分以上時間が経っている場合に異常検知フラグが立つように設定しています。

import datetime #<<<<<<モジュールのimportが必要です

#emergencyJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

MovBefTimes=[0,0,0,0,0,0]
MovJudgeCounter = 0 #<<<<<<異常検知カウンター
MovResetTime=30 #<<<<<<異常検知カウンター リセット時間
MovAlertThreshold = 60 #<<<<<<異常検知 カウンター しきい値
MovAlertWaitTime = 180 #<<<<<<通知待機時間設定
#画像の格納パス
pictpath='/home/pi/Desktop/CheckView/ObjectDetection/picts' #<<<<<<画像を保存するパス

#現在日付を取得
nowstr=datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
nowTime=time.time()

def emergencyJudge(img, nowTime, MovJudgeCounter):

    if MovJudgeCounter == 0:
        MovJudgeCounter = 1
        #初回検知時間を保存
        MovBefTimes[0]=int(nowTime)
    else:
        if int(nowTime) - MovBefTimes[1] < MovResetTime:
             MovJudgeCounter += 1
        else:
            MovJudgeCounter = 0
    print("MoveCounter=", MovJudgeCounter)
    #検知時間を保存
    MovBefTimes[1]=int(nowTime)
        
    #しきい値及び経過時間が規定以上になった場合に通知
    if MovJudgeCounter >= MovAlertThreshold and int(nowTime) - MovBefTimes[0] > MovAlertWaitTime:
        if int(nowTime) - MovBefTimes[2] > interval:
            #通知時間を保存
            MovBefTimes[1]=int(nowTime)
            #ファイルに保存 
            filename=pictpath+'/'+nowstr+'_EmergencyCall.png' #<<<<<<画像を保存する
            cv2.imwrite(filename, img)
            """通知機能を使う場合はコメントアウトを外す
            messageLine = "従業員異常通知_会議室" + BUILDING_NAME + ROOM_NUM
            messageAlexa = BUILDING_NAME + ROOM_NUM + "従業員の異常を確認しました。状況を確認してください。"
            lineMessage(filename, messageLine)
            alexaMessage(messageAlexa)
            sendSoracom(filename)
            """
                
    return MovJudgeCounter


1.4 おまけ機能について

おまけ機能は、新型コロナ対策 密集検知機能です。
会議室にカメラを設置し、規定人数以上の人が会議室内にいる場合にその旨を管理者に通知します。

密集検知までの流れは、以下の通りです。

  1. 人物検知にて会議室内の人数をカウント(1.1 人物検知の変数 personNumに対応)
  2. 上記を会議室内の人数として、規定人数以上場合に密集検知フラグを立てる


「employee_anomaly_detection.py」の密集検知用の関数は以下の通りです。
今回は規定人数を3人としています (CAPACITY_NUM = 3)

import datetime #<<<<<<モジュールのimportが必要です
#capacityJudge
BUILDING_NAME = "A" #建物番号
ROOM_NUM = "202" #部屋番号
interval=300 #通知間隔

CaBefTimes=[0,0,0,0,0,0]
CAPACITY_NUM = 3 #<<<<<<密集検知 しきい値 (3人以上で密集検知カウンターを上げる)
CaJudgeCounter = 0 #<<<<<<密集検知 カウンター
CaResetTime=20 #<<<<<<密集検知カウンター リセット時間
CaAlertThreshold = 60 #<<<<<<密集検知 カウンター しきい値

def capacityJudge(personNum, img, nowTime, CaJudgeCounter):
    if personNum >= (CAPACITY_NUM / 2):
        
        if CaJudgeCounter == 0:
            CaJudgeCounter = 1
        else:
            if int(nowTime) - CaBefTimes[0] < CaResetTime:
                CaJudgeCounter += 1
            else:
                CaJudgeCounter = 0
        #検知時間を保存
        CaBefTimes[0]=int(nowTime)
        print("CapacityCounter=", CaJudgeCounter)
            
        #通知(一定時間間隔)
        if CaJudgeCounter >= CaAlertThreshold:
            if int(nowTime) - CaBefTimes[1] > interval:
                #通知時間を保存
                CaBefTimes[1]=int(nowTime)
                #ファイルに保存 
                filename=pictpath+'/'+nowstr+'MaxCapacity.png'#<<<<<<画像を保存する
                cv2.imwrite(filename, img)
                """通知機能を使う場合はコメントアウトを外す
                messageLine = "密集防止アラート_会議室" + BUILDING_NAME + ROOM_NUM
                messageAlexa = BUILDING_NAME + ROOM_NUM + "会議室の収容人数を超過しています。注意喚起を実施ください。"
                lineMessage(filename, messageLine)
                alexaMessage(messageAlexa)
                sendSoracom(filename)
                """

    return CaJudgeCounter


2. 通知機能の実装

通知機能は、以下3種類で構成されており、異常検知フラグ及び密集検知フラグが立った場合に動かします。

  • Amazon Echoで音声通知
  • LINEで状況の画像と位置情報を通知
  • SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

2.1 Amazon Echoで音声通知

Amazon Echoを喋らせる方法は、以前の記事にまとめており、「alexa_remote_control」の機能を使います。スマートスピーカーAPIを使えば、非常に簡単に音声通知ができますね。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。
※以下に示すコードの他、同じパスに置く「alexa-remote_control.sh」ファイル内の設定も必要です。

import subprocess #<<<<<<モジュールのimportが必要です
import requests #<<<<<<モジュールのimportが必要です

def alexaMessage(message):
    message = "speak:" + message
    cmd = ["./alexa_remote_control.sh", "-e", message] #messageに通知したい文字を入れる
    res = subprocess.call(cmd)


2.2 LINEで状況の画像と位置情報を通知

LINEで通知する方法も、以前の記事にまとめており、こちらは「LINE Notify」の機能を使います。
「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def lineMessage(fname, message):
    url = "https://notify-api.line.me/api/notify"
    token = [""] #ここにLINE Notifyのトークンを入力
    for i in token:
        headers = {"Authorization" : "Bearer "+ i}
        payload = {"message" :  message} 
        files = {"imageFile": open(fname, "rb")}
        r = requests.post(url, headers = headers, params=payload, files=files)
        print(r.text)


通知結果
異常検知後に画像とメッセージがLINEに送られます。
※密集検知後も同様に画像とメッセージがLINEに送られます。
f:id:toku_dango:20210515214900p:plain

2.3 SORACOM Lagoonダッシュボードで状況の画像と位置情報を表示

SORACOM Lagoonダッシュボードで、状況が確認できるように設定します。
具体的には、以下のように設定をしました。
f:id:toku_dango:20210515224106p:plain
異常検知、密集検知後に画像と位置情報がSORACOM側に送られ、ダッシュボードで表示されます。
※位置情報は、サンプルとして大阪城に設定しています。

2.3.1 ダッシュボードの設定

ダッシュボードは、SORACOM LagoonのUIを使って設定します。
簡単に設定できますので、公式ページ(以下URL)を参考にして下さい。
users.soracom.io

画像ファイルを SORACOM Lagoonで表示させる方法も別ページで詳しく説明されています。
Getting Started: Harvest Files の画像を SORACOM Lagoon で表示する | SORACOM Harvest | SORACOM Users

2.3.2 SORACOM Harbest Data/Harvest Filesにファイルやデータを送る

SORACOM Lagoon ダッシュボードでファイルやデータを表示させます。
今回は、以下ファイルとデータを送ります。

ファイル: 画像ファイル
データ: 緯度経度情報, 部屋番号


「employee_anomaly_detection.py」の対象となる関数は以下の通りです。

def sendSoracom(filename):
    send_imageFile ='curl -v -X PUT --data-binary @'  + filename + ' -H content-type:images/png http://harvest-files.soracom.io/lagoon/harvest.png' #<<<<<<画像ファイルを送る
    send_message ='curl -v -X POST -H content-type:application/json -d {\"latitude\":34.6872318,\"longitude\":135.5259173,\"room_num\":' + ROOM_NUM + '} http://harvest.soracom.io' #<<<<<<緯度経度/部屋番号情報を送る(緯度経度は、サンプルとして大阪城の位置情報となっています)
    res = subprocess.call(send_message.split())
    res = subprocess.call(send_imageFile.split())


おわりに

今回は、人物検知と動体検知機能と組み合わせることで、
異常・密集判定し、3種類の方法で通知ができるようにしました。
異常検知に際して、カメラを作業現場に固定しておくことができるため
「装置を装着する等の作業員の追加作業が必要ない」という目標は、達成できたのではないかと思います。
ただ、「異常を正しく検知する」に関しては、まだまだ改善の余地がありそうなので、今後の課題と考えています。
機能の実装については、部分的な説明で大変申し訳ありませんが、試しに使ってみたい方は「employee_anomaly_detection.py」を使って試してください。

物体検知機能を使用すると、他にも色々なことができそうなので、今後も応用方法を考えて実行したいと思います。