Azure Stream Analytics を IoT Edge モジュールとしてデプロイする

この記事の内容

  • Azure Stream Analytics ジョブを「Edge」ホスティング環境で作成する方法
  • 入力・出力・クエリの定義手順
  • IoT Edge デバイスへのモジュールデプロイ(温度センサー+ASA ジョブ)
  • モジュール間のルーティング設定
  • Edge 上で ASA ジョブが実際に動作することの確認

Azure Stream Analytics を Edge で動かす意義

Azure Stream Analytics(ASA)は通常クラウド上で実行するサービスですが、IoT Edge モジュールとして Edge デバイス上に展開することもできます。

Edge 上で実行する利点は大きく2点あります。1つ目は低レイテンシです。クラウドへのラウンドトリップが不要になるため、リアルタイム性が高まります。2つ目は通信コストの削減です。すべてのデータをクラウドに送り続けるのではなく、Edge 側でフィルタリングや集計を行ってから必要な情報だけをクラウドに転送できます。

仕組みとしては、Azure 上で ASA ジョブの定義を作成すると、その定義が Azure Storage アカウントに保存されます。IoT Edge デバイスはモジュールとして起動する際にこの定義を取得し、Edge 上で実際の処理を実行します。


Azure Stream Analytics ジョブの作成

Azure ポータルで新しい Stream Analytics ジョブを作成します。

作成時の重要なポイントは「ホスティング環境」の選択です。通常はクラウドを選択しますが、今回は Edge を選択します。

設定項目
ジョブ名任意(例: asa-edge-job
ホスティング環境Edge
場所任意(例: 東日本)

作成後、ジョブリソースへ移動すると、入力・出力・クエリという3つの要素を定義する画面になります。


入力の定義

今回のシナリオでは、IoT Edge のハブ(Edge Hub)から温度データを受け取ります。

「ストリーム入力の追加」から入力を追加します。入力元として Edge Hub を選択し、以下のように設定します。

設定項目
入力エイリアスtemperature(任意)
イベントシリアル化形式JSON
エンコードUTF-8
イベント圧縮タイプなし

これで「Edge Hub からデータを受け取る」という入力定義が完成します。


出力の定義

出力先も同様に Edge Hub を選択します。分析結果のアラートを Edge Hub 経由で送信する設定です。

設定項目
出力エイリアスalert(任意)

クエリの定義

クエリでは「30秒間のタンブリングウィンドウ内の平均温度が70度以上であればリセットコマンドを送信する」というロジックを定義します。

SELECT
    'reset' AS command
INTO
    alert
FROM
    temperature TIMESTAMP BY timeCreated
GROUP BY
    TumblingWindow(second, 30)
HAVING
    AVG(machine.temperature) > 70
  • TumblingWindow(second, 30) が30秒間隔の集計を意味します
  • 平均温度が70度を超えた場合に reset コマンドを出力します
  • reset はセンサーモジュールに事前にプログラムされたアクションです

クエリを保存すると定義が完了します。


IoT の設定(ストレージアカウントとの関連付け)

Edge 上にデプロイするために、ジョブ定義を格納する Azure Storage アカウントを関連付けます。

ジョブの「IoT Edge の設定」から対象のサブスクリプションとストレージアカウントを選択します。これにより、ジョブの定義がストレージアカウント上に保存され、Edge デバイスがそれを参照できるようになります。


IoT Edge へのモジュールデプロイ

Azure ポータルで対象の IoT Hub → デバイス → モジュールの設定画面へ移動します。

デプロイするモジュールは以下の2つです。

モジュール①:SimulatedTemperatureSensor

温度センサーをシミュレートするモジュールです。このモジュールには reset などのアクションが事前に定義されています。

「IoT Edge モジュールの追加」から以下のように設定します。

設定項目
モジュール名SimulatedTemperatureSensor
イメージの URIMicrosoft 公式イメージの URI

モジュール②:Azure Stream Analytics Edge ジョブ

「Azure Stream Analytics モジュール」として追加します。

設定項目
サブスクリプション該当のサブスクリプション
Edge ジョブ先ほど作成した ASA ジョブ

追加すると、モジュール名・イメージ URI・環境変数(プランIDなど)・モジュールツインの設定が自動的に入力されます。モジュールツインのプロパティにはジョブリソースIDやストレージアカウント情報も含まれています。


ルートの設定

モジュール間のメッセージのやり取りを定義します。今回は以下の4つのルートを設定します。

ルート名内容
telemetry2cloudSimulatedTemperatureSensor の出力をクラウド(上流)へ送信
alert2cloudASA ジョブの出力(アラート)をクラウドへ送信
reset2sensorASA ジョブから出たリセットコマンドを SimulatedTemperatureSensor の入力(input/control)へ渡す
telemetry2asaSimulatedTemperatureSensor の出力を ASA ジョブの入力(temperature)へも渡す

このルート設定により、温度センサーが出したデータはクラウドにも送られながら、同時に ASA ジョブの入力にも渡されます。ASA ジョブが条件を満たしたと判断すると reset コマンドを出力し、それが再び温度センサーモジュールのコントロール入力へと渡されます。

「確認および作成」で全体の定義を確認し、デプロイを実行します。


動作確認

デプロイが完了したら、Edge デバイスにアクセスしてモジュールの動作を確認します。

sudo iotedge list

実行結果に以下のモジュールが正常稼働していれば成功です。

  • SimulatedTemperatureSensor
  • ASA ジョブのモジュール
  • edgeAgent
  • edgeHub

ASA ジョブのログを確認すると、Storage アカウントからジョブ定義を取得し、ジョブエンジンを起動してメッセージを処理している様子が確認できます。

温度センサーモジュールのログを見ると、温度が徐々に上昇していくことがわかります。

temperature:21,22,23,26...7470

30秒間の平均が70度を超えると、ASA ジョブが reset コマンドを発行します。メッセージボディに reset が含まれたメッセージが送信され、センサーがリセットされて温度が再び低い値(21, 22, 23…)に戻ることが確認できます。

また、Azure Storage アカウントの対象コンテナを確認すると、ASA ジョブの定義ファイル(JobDefinition.json など)が格納されており、Edge モジュールがこのファイルを参照して動作していることがわかります。


まとめ

今回は Azure Stream Analytics を IoT Edge モジュールとしてデプロイするチュートリアルを実施しました。

  • ASA ジョブを作成する際、ホスティング環境として「Edge」を選択することで Edge デプロイ用のジョブとして定義できます
  • 入力・出力・クエリを定義し、Storage アカウントと紐付けることでジョブ定義がクラウドに保存されます
  • IoT Edge デバイスに ASA モジュールと温度センサーモジュールをデプロイし、ルートでモジュール間のメッセージフローを定義します
  • Edge Hub がハブとして機能し、モジュール間のメッセージルーティングを担います
  • Azure 上で定義した ASA クエリがそのまま Edge 上でも動作するため、クラウドの ASA に慣れているエンジニアにとって取り組みやすい構成です

リアルタイムに流れてくるメッセージに対してクエリを適用し、クラウドではなく Edge 上で処理を完結させるこのアーキテクチャは、低遅延・低帯域な IoT ソリューションを構築する際に有効な選択肢となります。