Azure Stream Analytics を IoT Edge モジュールとしてデプロイする
この記事の内容
- Azure Stream Analytics ジョブを「Edge」ホスティング環境で作成する方法
- 入力・出力・クエリの定義手順
- IoT Edge デバイスへのモジュールデプロイ(温度センサー+ASA ジョブ)
- モジュール間のルーティング設定
- Edge 上で ASA ジョブが実際に動作することの確認
Azure Stream Analytics を Edge で動かす意義
Azure Stream Analytics(ASA)は通常クラウド上で実行するサービスですが、IoT Edge モジュールとして Edge デバイス上に展開することもできます。
Edge 上で実行する利点は大きく2点あります。1つ目は低レイテンシです。クラウドへのラウンドトリップが不要になるため、リアルタイム性が高まります。2つ目は通信コストの削減です。すべてのデータをクラウドに送り続けるのではなく、Edge 側でフィルタリングや集計を行ってから必要な情報だけをクラウドに転送できます。
仕組みとしては、Azure 上で ASA ジョブの定義を作成すると、その定義が Azure Storage アカウントに保存されます。IoT Edge デバイスはモジュールとして起動する際にこの定義を取得し、Edge 上で実際の処理を実行します。
Azure Stream Analytics ジョブの作成
Azure ポータルで新しい Stream Analytics ジョブを作成します。
作成時の重要なポイントは「ホスティング環境」の選択です。通常はクラウドを選択しますが、今回は Edge を選択します。
| 設定項目 | 値 |
|---|---|
| ジョブ名 | 任意(例: asa-edge-job) |
| ホスティング環境 | Edge |
| 場所 | 任意(例: 東日本) |
作成後、ジョブリソースへ移動すると、入力・出力・クエリという3つの要素を定義する画面になります。
入力の定義
今回のシナリオでは、IoT Edge のハブ(Edge Hub)から温度データを受け取ります。
「ストリーム入力の追加」から入力を追加します。入力元として Edge Hub を選択し、以下のように設定します。
| 設定項目 | 値 |
|---|---|
| 入力エイリアス | temperature(任意) |
| イベントシリアル化形式 | JSON |
| エンコード | UTF-8 |
| イベント圧縮タイプ | なし |
これで「Edge Hub からデータを受け取る」という入力定義が完成します。
出力の定義
出力先も同様に Edge Hub を選択します。分析結果のアラートを Edge Hub 経由で送信する設定です。
| 設定項目 | 値 |
|---|---|
| 出力エイリアス | alert(任意) |
クエリの定義
クエリでは「30秒間のタンブリングウィンドウ内の平均温度が70度以上であればリセットコマンドを送信する」というロジックを定義します。
SELECT
'reset' AS command
INTO
alert
FROM
temperature TIMESTAMP BY timeCreated
GROUP BY
TumblingWindow(second, 30)
HAVING
AVG(machine.temperature) > 70
TumblingWindow(second, 30)が30秒間隔の集計を意味します- 平均温度が70度を超えた場合に
resetコマンドを出力します resetはセンサーモジュールに事前にプログラムされたアクションです
クエリを保存すると定義が完了します。
IoT の設定(ストレージアカウントとの関連付け)
Edge 上にデプロイするために、ジョブ定義を格納する Azure Storage アカウントを関連付けます。
ジョブの「IoT Edge の設定」から対象のサブスクリプションとストレージアカウントを選択します。これにより、ジョブの定義がストレージアカウント上に保存され、Edge デバイスがそれを参照できるようになります。
IoT Edge へのモジュールデプロイ
Azure ポータルで対象の IoT Hub → デバイス → モジュールの設定画面へ移動します。
デプロイするモジュールは以下の2つです。
モジュール①:SimulatedTemperatureSensor
温度センサーをシミュレートするモジュールです。このモジュールには reset などのアクションが事前に定義されています。
「IoT Edge モジュールの追加」から以下のように設定します。
| 設定項目 | 値 |
|---|---|
| モジュール名 | SimulatedTemperatureSensor |
| イメージの URI | Microsoft 公式イメージの URI |
モジュール②:Azure Stream Analytics Edge ジョブ
「Azure Stream Analytics モジュール」として追加します。
| 設定項目 | 値 |
|---|---|
| サブスクリプション | 該当のサブスクリプション |
| Edge ジョブ | 先ほど作成した ASA ジョブ |
追加すると、モジュール名・イメージ URI・環境変数(プランIDなど)・モジュールツインの設定が自動的に入力されます。モジュールツインのプロパティにはジョブリソースIDやストレージアカウント情報も含まれています。
ルートの設定
モジュール間のメッセージのやり取りを定義します。今回は以下の4つのルートを設定します。
| ルート名 | 内容 |
|---|---|
telemetry2cloud | SimulatedTemperatureSensor の出力をクラウド(上流)へ送信 |
alert2cloud | ASA ジョブの出力(アラート)をクラウドへ送信 |
reset2sensor | ASA ジョブから出たリセットコマンドを SimulatedTemperatureSensor の入力(input/control)へ渡す |
telemetry2asa | SimulatedTemperatureSensor の出力を ASA ジョブの入力(temperature)へも渡す |
このルート設定により、温度センサーが出したデータはクラウドにも送られながら、同時に ASA ジョブの入力にも渡されます。ASA ジョブが条件を満たしたと判断すると reset コマンドを出力し、それが再び温度センサーモジュールのコントロール入力へと渡されます。
「確認および作成」で全体の定義を確認し、デプロイを実行します。
動作確認
デプロイが完了したら、Edge デバイスにアクセスしてモジュールの動作を確認します。
sudo iotedge list
実行結果に以下のモジュールが正常稼働していれば成功です。
SimulatedTemperatureSensor- ASA ジョブのモジュール
edgeAgentedgeHub
ASA ジョブのログを確認すると、Storage アカウントからジョブ定義を取得し、ジョブエンジンを起動してメッセージを処理している様子が確認できます。
温度センサーモジュールのログを見ると、温度が徐々に上昇していくことがわかります。
30秒間の平均が70度を超えると、ASA ジョブが reset コマンドを発行します。メッセージボディに reset が含まれたメッセージが送信され、センサーがリセットされて温度が再び低い値(21, 22, 23…)に戻ることが確認できます。
また、Azure Storage アカウントの対象コンテナを確認すると、ASA ジョブの定義ファイル(JobDefinition.json など)が格納されており、Edge モジュールがこのファイルを参照して動作していることがわかります。
まとめ
今回は Azure Stream Analytics を IoT Edge モジュールとしてデプロイするチュートリアルを実施しました。
- ASA ジョブを作成する際、ホスティング環境として「Edge」を選択することで Edge デプロイ用のジョブとして定義できます
- 入力・出力・クエリを定義し、Storage アカウントと紐付けることでジョブ定義がクラウドに保存されます
- IoT Edge デバイスに ASA モジュールと温度センサーモジュールをデプロイし、ルートでモジュール間のメッセージフローを定義します
- Edge Hub がハブとして機能し、モジュール間のメッセージルーティングを担います
- Azure 上で定義した ASA クエリがそのまま Edge 上でも動作するため、クラウドの ASA に慣れているエンジニアにとって取り組みやすい構成です
リアルタイムに流れてくるメッセージに対してクエリを適用し、クラウドではなく Edge 上で処理を完結させるこのアーキテクチャは、低遅延・低帯域な IoT ソリューションを構築する際に有効な選択肢となります。