アプリケーション連携チュートリアル

重要

このドキュメントについては Sora サポートの対象外です。

このドキュメントは Sora のウェブフックと API を利用したアプリケーションサーバー開発者向けです。

概要

Sora はウェブフックと API を利用して、アプリケーションサーバーと連携することができます。

ウェブフック

ウェブフックは認証と払い出し、そしてコネクションやセッション、録画などの状況の取得です。

Sora から一定間隔で送られてくるウェブフックを利用する事で定期的な状態を取得することができます。 どのコネクションが何分繫いでいたかの把握や、そのセッションは何分で終了させるといったことも実現できます。

ウェブフックは Sora からアプリケーションサーバーへ HTTP/1.1 でリクエストで送信します。

API

ウェブフックはコネクションの切断、セッションの破棄、統計情報の取得などアプリケーションサーバーが必要とするタイミングで送信することができます。

API はアプリケーションサーバーから Sora へ HTTP/1.1 でリクエストを送信する必要があります。

JSON

Sora がアプリケーションに送信するウェブフックの JSON は snake_case を採用しています。 camelCase ではないことに注意してください。

snake_case の JSON はシグナリング、ウェブフック、 API で利用します。

ウェブフックのローカル開発

Sora をサーバーに立て、ウェブフックの送信先をローカルにしたい場合には ngrok がお勧めです。

ngrok の利用方法

URL:

https://ngrok.com/

ngrok は無料プランでも、静的ドメインを一つ利用する事ができるため、 この静的ドメインを Sora 側に指定することにより、ローカルのウェブフックを利用することができます。

ngrok は Windows / Linux / macOS で利用可能です。

また ngrok が HTTPS を終端し、HTTP でアプリケーションサーバーにリクエストを送信するため、 アプリケーションサーバー側を HTTPS にする必要はありません。

シーケンス図

ngrok を利用した場合のシーケンス図です。 --domain には example ではなく ngrok のダッシュボードに表示されているドメインを指定してください。

サンプルコードについて

アプリケーションサーバーのサンプルコードを Python を利用しています。 あくまで例であり、実際のアプリケーションでの利用は想定しておりません。

現在は OpenAI ChatGPTGitHub Copilot を利用する事で、 他の言語への変換がとても簡単に行えるようになったこともあり、 サンプルコードに利用するプログラミング言語を Python に統一しています。

Python 3.12 以降で動作確認をしています。

aiohttp

サンプルコードでは Python の非同期 HTTP ライブラリである aiohttp を利用しています。

$ pip install aiohttp でインストールして利用してください。

Django 風

ウェブフックの処理を行うコードです。 このドキュメントではこちらの書き方を利用します。

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


app = web.Application()
app.router.add_routes(
    [
        web.post("/auth", auth_webhook),
    ]
)

if __name__ == "__main__":
    web.run_app(app, port=8000)

Flask 風

デコレーターを利用して、ウェブフックの処理を行うコードです。 Django 風と Flask 風、どちらを利用するかは好みの問題ですので、 好きな方を利用してください。

from aiohttp import web

routes = web.RouteTableDef()


@routes.post("/auth")
async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


app = web.Application()
app.router.add_routes(routes)

if __name__ == "__main__":
    web.run_app(app, port=8000)

ウェブフック HTTP 実装

ウェブフックを処理するコードです。

sora.conf の以下の設定に URL を指定してください。

例として ngrok を利用している URL を指定しています。

auth_webhook_url = https://example.ngrok-free.app/auth
session_webhook_url = https://example.ngrok-free.app/session
event_webhook_url = https://example.ngrok-free.app/event

認証が常に成功するようなコードになっています。

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


async def session_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


async def event_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


app = web.Application()
app.router.add_routes(
    [
        web.post("/auth", auth_webhook),
        web.post("/session", session_webhook),
        web.post("/event", event_webhook),
    ]
)

if __name__ == "__main__":
    web.run_app(app, port=8000)

ウェブフック HTTPS 対応

Sora のウェブフックは HTTPS にも対応しています。

Sora の HTTPS はサーバ証明書のチェックをデフォルトで OS に組み込まれた証明書を利用して行います。 自前で用意した証明書などを利用したい場合は webhook_tls_verify_cacert_file を利用してください。

詳細は ウェブフックリクエストなどの送信先サーバー証明書の検証に利用する OS 組み込みのルート CA 証明書について をご確認ください。

import ssl

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


async def session_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


async def event_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


app = web.Application()
app.router.add_routes(
    [
        web.post("/auth", auth_webhook),
        web.post("/session", session_webhook),
        web.post("/event", event_webhook),
    ]
)


# サーバー証明書
certfile: str = "192.0.2.1.pem"
# サーバー証明書のプライベートキー
keyfile: str = "192.0.2.1-key.pem"

if __name__ == "__main__":
    # SSL コンテキストの作成
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile, keyfile)

    # HTTPS サーバーとして起動
    web.run_app(app, port=4433, ssl_context=ssl_context)

ウェブフック mTLS (mutual-TLS またはクライアント認証) 対応

Sora のウェブフックは mTLS にも対応しています。 その場合、アプリケーションサーバー側に、Sora に設定した証明書の CA 証明書を指定する必要があります。

import ssl

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


async def session_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


async def event_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


app = web.Application()
app.router.add_routes(
    [
        web.post("/auth", auth_webhook),
        web.post("/session", session_webhook),
        web.post("/event", event_webhook),
    ]
)

# サーバー証明書
certfile: str = "192.0.2.1.pem"
# サーバー証明書のプライベートキー
keyfile: str = "192.0.2.1-key.pem"
# クライアント証明書のCA証明書
cafile: str = "ca.pem"

if __name__ == "__main__":
    # SSL コンテキストの作成
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile, keyfile)
    ssl_context.load_verify_locations(cafile)

    # クライアント証明書の検証を要求
    ssl_context.verify_mode = ssl.CERT_REQUIRED

    # HTTPS サーバーとして起動
    web.run_app(app, port=4433, ssl_context=ssl_context)

ウェブフック IPv6 対応

Sora は昨今の IPv4 事情を考慮して IPv6 にも対応しています。

sora.confwebhook_ipv6true にすることで、 IPv6 のみのウェブフックエンドポイントと通信をすることができます。

認証ウェブフック

Sora 自体は認証処理を持っていません。 そのため、アプリケーションサーバー側で認証の仕組みを開発する必要があります。

Sora は auth_webhook_url に指定した URL にクライアントから送られてきた情報などを、 アプリケーションサーバに対して HTTP (または HTTPS) リクエストとして送信します。

認証メタデータ

認証ウェブフックにはクライアントが送ってくる metadata が含まれます。 これはクライアントが自由に値を含められる場所で、ここの値を使って認証を行うことを推奨しています。

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    data = await request.json()
    # metadata がない場合は None が返る
    metadata = data.get("metadata")
    if not metadata:
        # metadata がない場合は認証を許可しない
        # 認証を拒否する場合は 'allowed': False と 'reason': '認証失敗理由' を返す
        # この認証失敗理由はクライアントまで通知されるので注意すること
        return web.json_response({"allowed": False, "reason": "metadata not found"})

    # 認証を許可する JSON を返す
    return web.json_response({"allowed": True})

認証メタデータに JWT

認証メタデータの中に access_token として JWT が入ってきた場合の処理です。

PyJWT を利用して、JWT をデコードして認証を行います。

import os

import jwt
from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    data = await request.json()
    metadata = data.get("metadata")
    if not metadata:
        return web.json_response({"allowed": False, "reason": "metadata not found"})

    access_token = metadata.get("access_token")
    if not access_token:
        return web.json_response({"allowed": False, "reason": "access_token not found"})

    # 環境変数からシークレットキーを取得
    secret = os.environ.get("JWT_SECRET_KEY")
    if not secret:
        return web.json_response({"allowed": False, "reason": "secret key not found"})

    try:
        # PyJWT を利用して JWT をデコードし、署名を検証する
        jwt.decode(access_token, secret, algorithms=["HS256"])
    except jwt.InvalidTokenError:
        # トークンが無効の場合は認証を拒否
        return web.json_response({"allowed": False, "reason": "invalid token"})

    # 認証を許可する JSON を返す
    return web.json_response({"allowed": True})

イベントメタデータ

認証ウェブフックで、認証成功時にイベントメタデータを払い出すことができます。 イベントメタデータはイベントウェブフックの connection.{created, updated, destroyed} に含まれます。

このイベントメタデータは Sora とアプリケーションサーバー間だけでやり取りされ、クライアントには送られません。 そのためデータベースの Primary Key などを入れておいたりすることができます。

from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    # ここでデータベースなどの値を引っ張る
    account_pk = 1

    # イベントメタデータ
    event_metadata = {
        "account_pk": account_pk,
    }

    # 認証を許可する JSON を返す
    return web.json_response({"allowed": True, "event_metadata": event_metadata})
    return web.json_response({"allowed": True, "event_metadata": event_metadata})

イベントメタデータはウェブフックログには REDACTED として記録されます。 これはセンシティブデータを含んでいる可能性があるためです。

REDACTED については センシティブデータ をご確認ください。

        sequenceDiagram
    participant C as クライアント
    participant S as Sora
    participant A as アプリケーション
    C ->>+ S: "type": "connect"
    S ->>+ A: 認証ウェブフック
    A -->>- S: 200 OK<br>"allowed": true<br>"event_metadata": {"account_pk": 1}
    S ->>+ A: セッションウェブフック<br>session.created
    A -->>- S: 200 OK
    S ->>- C: "type": "offer"
    note over C,S: WebRTC 確立
    S ->>+ A: イベントウェブフック<br>connection.created<br>"event_metadata": {"account_pk": 1}
    A -->>- S: 200 OK
    S ->>+ A: イベントウェブフック<br>connection.updated<br>"event_metadata": {"account_pk": 1}
    A -->>- S: 200 OK
    note over S,A: connection.updated は一定間隔で送信
    C -) S: "type": "disconnect"
    S -) C: "Close"
    note over C,S: WebRTC 切断
    S ->>+ A: イベントウェブフック<br>connection.destroyed<br>"event_metadata": {"account_pk": 1}
    A -->>- S: 200 OK
    

セッションウェブフック

セッションウェブフックはセッション単位の変化を通知するためのウェブフックです。

Sora は session_webhook_url に指定した URL にセッションの変化を HTTP リクエストとして送信します。

セッション

セッションはチャネルで少なくとも 1 つのクライアントが接続しているチャネルの状態を指します。

セッションメタデータ

from aiohttp import web


async def session_webhook(request: web.Request) -> web.Response:
    # ここでデータベースなどの値を引っ張る
    room_pk = 2
    return web.json_response({"session_metadata": {"room_pk": room_pk}})

セッション録画開始

セッションウェブフックの session.created の戻り値に recording: true を含める事で、 そのセッションが開始したタイミングから録画を開始することができます。

from aiohttp import web


async def session_webhook(request: web.Request) -> web.Response:
    # このセッションで録画を有効にする
    return web.json_response({"recording": True})

イベントウェブフック

イベントウェブフックはコネクション単位での変化を通知するためのウェブフックです。

  • コネクションが接続したら、コネクション接続のイベントが送信されます

  • コネクションが接続している間は、コネクション更新のイベントが一定間隔で送信されます

  • コネクションが切断したら、コネクション切断のイベントが送信されます

connection.{created,updated,destroyed}

from aiohttp import web


async def event_webhook(request: web.Request) -> web.Response:
    data = await request.json()

    match data.get("type"):
        case "connection.created":
            # connection.created イベントの処理
            # データベースに保存したりする
            pass
        case "connection.updated":
            # connection.updated イベントの処理
            # データベースに保存したりする
            pass
        case "connection.destroyed":
            # connection.destroyed イベントの処理
            # データベースに保存したりする
            pass
        case _:
            # 未対応のイベントの処理
            pass

    return web.Response(status=204)

API

Sora の API は、パスを利用せず、ヘッダーを利用して判定する方式を採用しています。

  • メソッドは常に POST を利用します

  • パスは常に / を利用します

  • x-sora-target ヘッダーを利用して、どの API にアクセスするかを判定します

指定したチャネルのコネクションを切断する

import asyncio

import aiohttp


async def main():
    url = "https://sora.example.com"
    headers = {
        "X-Sora-Target": "Sora_20151104.DisconnectConnection",
    }
    data = {
        "channel_id": "sora",
        "connection_id": "T34CDBMRJS1B5BVPF17RTBQA3C",
    }

    async with aiohttp.ClientSession() as session:
        # Nginx で api の prefix を /api に設定している場合は以下のように変更
        # response = await session.post(f"{url}/api", json=data, headers=headers)
        response = await session.post(url, json=data, headers=headers)

        print(response.status)
        response_data = await response.text()
        print(response_data)


if __name__ == "__main__":
    asyncio.run(main())

接続してから 3 分経過したコネクションを切断する

イベントウェブフック connection.updated と、 コネクション切断 DisconnectConnection API を利用して、 接続してから 3 分経過したコネクションを切断します。

import aiohttp
from aiohttp import web


async def auth_webhook(request: web.Request) -> web.Response:
    return web.json_response({"allowed": True})


async def session_webhook(request: web.Request) -> web.Response:
    return web.Response(status=204)


async def event_webhook(request: web.Request) -> web.Response:
    data = await request.json()
    match data.get("type"):
        case "connection.updated":
            if data.get("minutes", 0) >= 3:
                channel_id = data.get("channel_id")
                connection_id = data.get("connection_id")

                url = "https://sora.example.com/api"

                headers = {
                    "X-Sora-Target": "Sora_20151104.DisconnectConnection",
                }

                data = {
                    "channel_id": channel_id,
                    "connection_id": connection_id,
                }

                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        url,
                        headers=headers,
                        json=data,
                    ) as response:
                        if response.status != 200:
                            # エラー処理
                            pass
        case _:
            # 見知らぬイベントの処理
            pass

    return web.Response(status=204)


app = web.Application()
app.router.add_routes(
    [
        web.post("/auth", auth_webhook),
        web.post("/session", session_webhook),
        web.post("/event", event_webhook),
    ]
)

if __name__ == "__main__":
    web.run_app(app, port=8000)

シーケンス図

        sequenceDiagram
    participant C as クライアント
    participant S as Sora
    participant A as アプリケーション
    C ->>+ S: "type": "connect"
    S ->>+ A: 認証ウェブフック
    A -->>- S: 200 OK<br>"allowed": true
    S ->>+ A: セッションウェブフック<br>session.created
    A -->>- S: 200 OK
    S ->>- C: "type": "offer"
    note over C,S: WebRTC 確立
    S ->>+ A: イベントウェブフック<br>connection.created<br>"minutes": 0
    A -->>- S: 200 OK
    S ->>+ A: イベントウェブフック<br>connection.updated<br>"minutes": 1
    A -->>- S: 200 OK
    S ->>+ A: イベントウェブフック<br>connection.updated<br>"minutes": 2
    A -->>- S: 200 OK
    note over C,A: WebRTC 確立してから 10 分経過
    S ->>+ A: イベントウェブフック<br>connection.updated<br>"minutes": 10
    A -->>- S: 200 OK
    A ->>+ S: DisconnectConnection API
    S -->>- A: 200 OK
    S -) C: "Close"
    note over C,S: WebRTC 切断
    S ->>+ A: イベントウェブフック<br>connection.destroyed<br>"minutes": 10
    A -->>- S: 200 OK
    
© Copyright 2024, Shiguredo Inc Created using Sphinx 8.2.3