仙石浩明の日記

2007年1月22日

Perl の非同期I/Oモジュール POE を使って VPN-Warp relayagent を書いてみました hatena_b

多数の TCP/IP セッションを同時に維持する必要性などから、 非同期I/O が最近流行りのようです。 何をいまさら、という気もするのですが、 いわゆる「最新技術」の多くが 30年前の技術の焼き直しに過ぎない今日このごろなので、 非同期I/O 技術が「再発見」されるのも、 「歴史は繰り返す」の一環なのでしょう。 スレッドが当たり前の時代になってからコンピュータ技術を学んだ人にとっては、 (古めかしい) 非同期I/O が新鮮に映るのかも知れず、 なんだか「ファッションのリバイバル」に似ていますね。

Perl で非同期I/O 処理を手軽に行なうための枠組みとして、 POE: Perl Object Environment というものが あるようです。 POE を使うと、 あたかもスレッドを使っているような手軽さでプログラミングできます。 試しに VPN-Warp の relayagent を POE を使って書いてみました。 オリジナルの relayagent は C 言語で記述した 4000 行を超える プログラムなのですが、 Perl だと 200 行以下で一通り動くものが書けてしまいました (もちろん C 版の機能を全て実装したわけではありません)。

POE を触るのは今回が初めてだったので、 マニュアルをいちいち参照しながら書いたのですが、 なにせわずか 200 行ですから、 開発はデバッグ込みで 1 日かかりませんでした。 改めて Perl の記述性の良さと開発効率の高さに感動したのですが、 これだけ簡潔に書けてしまうと、 relayagent の機能を解説するときの教材としても使えそうです。

というわけで、 今までブラックボックスだった relayagent の中身の解説を試みたいと思います。 これから POE を使ってみようとする人の参考にもなれば幸いです。

VPN-Warp の relayagent とは、 以下の図のようにリレーサーバと Webサーバの両方へ接続して、 リレーサーバから受取ったリクエストを Webサーバへ中継するプログラムです。 http リクエストを受取ってサービスを行なうのですから、 サーバの一種と言えますが、 外部から接続を受付けるわけではなく、 リレーサーバと Webサーバの両方に対してクライアントとして振る舞う点が ユニークと言えるでしょう。

                      リレー            イントラ         イントラ
ブラウザ ─────→ サーバ ←──── relayagent──→ Webサーバ
            https     443番ポート                        80番ポート

http リクエストを受取って Webサーバへ中継するプログラムというと、 proxy サーバを思い浮かべるかも知れません。 proxy サーバはその名の通り、 ブラウザに対してはサーバとして振る舞います:

                                        proxy            イントラ
ブラウザ ──────────────→ サーバ────→ Webサーバ
                                        8080番ポート     80番ポート

proxy サーバが、ブラウザからの接続を受付けて、 それを Webサーバに中継するのに対し、 relayagent は自身では接続を受付けずに中継する、 という違いがお分かりでしょうか? relayagent は接続を受ける必要がないため、 ファイアウォールの内側など、 外部からアクセスできない場所で使うことが可能になっています。

なお、C 版の relayagent はリレーサーバに対して https で接続するのですが、 Perl 版 relayagent (以下 relayagent.pl) は、 説明の都合上 SSL 暗号化の機能を含んでいません。 実際に使うときは、 stone などで SSL 暗号化して リレーサーバに接続する必要があります。

         リレー                         イントラ         イントラ
         サーバ ←──── stone ←── relayagent──→ Webサーバ
         443番ポート       SSL化        Perl 版          80番ポート

例えば stone を

stone -q pfx=relay,5000005.pfx \
      -q passfile=relay,5000005-pass.txt \
      warp.klab.org:443/ssl localhost:12345 &

などと実行しておき、 relayagent.pl はリレーサーバに接続する代わりに、 localhost の 12345 番に接続します。

では、relayagent.pl を順に見ていきましょう。

#!/usr/bin/perl
use POE qw(Component::Client::TCP Filter::Stream);
my $IdleTimerMax = 6;        # 60 sec
&help unless @ARGV == 2;
&help unless shift =~ m/^(\w+):(\d+)$/;
my ($RelayHost, $RelayPort) = ($1, $2);
&help unless shift =~ m/^(\w+):(\d+)$/;
my ($WebHost, $WebPort) = ($1, $2);
my %WebHeap;
my $PollBuf;
my $PollHeap;
my $PollHeader;
my $IdleTimer;
my $DisconectTime = 0;

$RelayHost, $RelayPort は、 リレーサーバのホスト名とポート番号ですが、
前述したように stone 経由でリレーサーバにつなぐために、
$RelayHost = "localhost", $RelayPort = 12345 などとなります。また、 $WebHost, $WebPort は、 中継先となる (イントラの) Webサーバのホスト名とポート番号です。

続いて、リレーサーバへ接続する (直接の接続先は SSL 化を行なう stone ですが、 煩雑になるので以下 「リレーサーバ」 と略記します) ためのコードです:

POE::Component::Client::TCP->new
    ( RemoteAddress => $RelayHost,
      RemotePort    => $RelayPort,
      Connected     => sub {
          $PollHeap = $_[HEAP];
          undef $PollHeader;
          $PollBuf = "";
          $IdleTimer = $IdleTimerMax;
          $PollHeap->{server}->
              put("GET /KLAB/poll HTTP/1.1\r\nX-Ver: realyagent.pl 0.01\r\n\r\n");
      },
      ServerInput   => sub {
          $PollHeap = $_[HEAP];
          $PollBuf .= $_[ARG0];
          &doPoll;
      },
      Filter        => POE::Filter::Stream->new(),
      Disconnected  => \&reconnectPoll,
    );

POE では、非同期に動く処理を、 処理ごとに分けて書くことができます。 各処理のことを「POEセッション」と呼びます。

上記は、リレーサーバへ接続する POEセッションの生成です。 接続先ホストおよびポートを、 それぞれ $RelayHost と $RelayPort に設定しています。

「Connected => sub {」から始まる部分が、 接続に成功したときに実行するコードです。 細かいところはさておき、 接続したら以下のリクエストをリレーサーバに送る、 という点は読み取れるのではないでしょうか。

GET /KLAB/poll HTTP/1.1
X-Ver: realyagent.pl 0.01

同様に、 「ServerInput => sub {」から始まる部分が、 通信相手 (リレーサーバ) からデータを受信したときに実行するコードです。 受信したデータは、 いったん変数 $PollBuf に溜めておいて、 続いて呼び出す doPoll の中で処理を行ないます。

以上からお分かりのように、 リレーサーバへデータを送るときは、
「$PollHeap->{server}->put(送るべきデータ);」を実行し、 リレーサーバからデータが送られてきた時は、 doPoll で受取ります。 とても見通しが良いですね。

各 POEセッションは、スレッドと同様、同一メモリ空間を共有しているので、 他の POEセッションが変更した変数の値を参照できます。 したがってどの POEセッションでもリレーサーバへデータを送ることができますし、 リレーサーバから受信したデータはどの POEセッションでも読むことができます。

続いて、もう一つ POEセッションを作ります。

POE::Session->create
    ( inline_states =>
      { _start => sub {
          $_[KERNEL]->delay( tick => 10 );
        },
        tick => sub {
            if ($IdleTimer > 0) {
                if (--$IdleTimer <= 0) {
                    &sendControl(0, -2);        # keep alive
                }
            }
            $_[KERNEL]->delay( tick => 10 );
        },
      },
    );
$poe_kernel->run;
exit;

この POEセッションは 10秒に一回、 「tick => sub {」から始まる部分を実行します。 見ての通り、$IdleTimer の値を減らしていって、 0 になったら sendControl を実行します。 $IdleTimer は最初 6 ($IdleTimerMax) に設定されるので、 1 分ごとに sendControl を実行する、という意味ですね。

以上 2つの POEセッションは作成しただけで、まだ走り出していません。
その次の「$poe_kernel->run;」が各 POEセッションを走らせるための呼び出しです。 このルーチンは全ての POEセッションが終了するまで返ってきません。

さて、relayagent はリレーサーバとの接続を常時維持していますが、 無通信時間が続くと (通信経路中にあるファイアウォールなどに) 切られてしまう恐れがあるので、 keep alive ブロックを送信しています。 通信が行なわれていない時間を測るためのカウンタが $IdleTimer というわけです。

通信が行なわれない限り $IdleTimer は減り続け、 1 分経過すると sendControl(0, -2) を呼び出して keep alive ブロックを送信します。 sendControl はこんな感じ:

sub sendControl {
    my ($id, $control) = @_;
    $control += 65536 if $control < 0;
    $IdleTimer = $IdleTimerMax;
    if (defined $PollHeap && $PollHeap->{connected}) {
        $PollHeap->{server}->put(pack("nn", $id, $control));
    }
}

既に説明したように「$PollHeap->{server}->put(データ)」は、 リレーサーバにデータを送る呼び出しですから、 「pack("nn", 0, 65534)」が keep alive ブロックであることが分かります。

「ブロック」というのは VPN-Warp 用語でして、 relayagent とリレーサーバとの通信は、 基本的にこの「ブロック」を単位にして行ないます。 ブロックは次のような可変長のデータです。

    ┌───┬───┬───┬───┬───┬─≪─┬───┐
    │セッションID│ データ長  │  可変長データ   │
    └───┴───┴───┴───┴───┴─≫─┴───┘
          2バイト         2バイト      「データ長」バイト

「セッションID」および「データ長」は、ビッグエンディアンです。 つまり上位バイトが先に来ます。 データ長が 0 ないし負数の場合は、 「可変長データ」の部分は 0 バイトになります。

データ長が 0 ないし負数であるブロックは、 コントロール用のブロックで、 以下の意味を持っています:

データ長意味内容
0EOFWebセッションの終了を要求
-1ErrorWebセッションの異常終了を要求
-2Keep Alive無通信状態が続いたときに送信
-3X OFFWebセッションのデータ送信の一時停止を要求
-4X ONWebセッションのデータ送信の再開を要求

ブラウザ送ったリクエストを Webサーバに届け、 Webサーバのレスポンスをブラウザに返す一連の通信のことを、 ここでは「Webセッション」と呼ぶことにします。 つまり、 VPN-Warp が提供する仮想的な通信路 (トンネル) 上のセッションです。

VPN-Warp セッション

ブラウザがリレーサーバと通信するときの TCP/IPセッションと、 relayagent と Webサーバが通信するときの TCP/IPセッションを対応づけるのが、 セッションID です。 「セッション」という言葉が何度も出てきてややこしいですが、 「セッションID」の「セッション」は、 「Webセッション」の意味です。

リレーサーバと relayagent との間は、 複数の Webセッションを一本の TCP/IPセッションに相乗りさせるので、 そのとき各 Webセッションがこんがらないようにするために ブロックにはセッションID がつけられている、というわけです。

では、次はいよいよ relayagent の中核ルーチンである doPoll です:

sub doPoll {
    do {
        if (! defined $PollHeader) {
            if ($PollBuf =~ /\r\n\r\n/) {
                $PollHeader = $`;
                $PollBuf = $';
            }
        }
        return unless defined $PollHeader;
        my ($id, $len, $data) = unpack("nna*", $PollBuf);
        return unless defined $id && defined $len && $len ne "";
        if ($len > 32767) {
            $len -= 65536;
            $PollBuf = $data;
            if ($len == -1) {
                &closeWeb($id);
            }
        } elsif ($len > 0) {
            return unless defined $data && length($data) >= $len;
            ($data, $PollBuf) = unpack "a${len}a*", $data;
            &reqWeb($id, $data);
        } else {        # len == 0
            $PollBuf = $data;
            &closeWeb($id);
        }
    } while ($PollBuf);
}

前述したように、relayagent はリレーサーバに接続したとき、 まず
「GET /KLAB/poll HTTP/1.1」から始まるリクエストヘッダを送ります。 するとリレーサーバは、 次のようなレスポンスを返します:

HTTP/1.1 200 OK
X-Customer: nusers=5&type=1&expire=1169696110&digest=3f6977eceb8c2c43e28e6026b08ba900

そしてこの後 (doPoll において「defined $PollHeader」が真のとき)、 リレーサーバと relayagent は、 前述したブロックを送受信することになります。

「my ($id, $len, $data) = unpack("nna*", $PollBuf);」の部分が、
リレーサーバから受信したブロックを、
「セッションID ($id)」 「データ長 ($len)」 「可変長データ ($data)」 に分解している処理ですね。 続いてブロックの処理が行なわれますが、 コントロールブロックに関する処理は割愛して、 可変長データが付いているブロックの処理を見ていきましょう。 ここで受信した可変長データは、 ブラウザが送信した http リクエストを 2048バイトごとに分割したものです。

つまりリレーサーバは、 ブラウザから https リクエストを受取るたびに「セッションID」を割り振ります。 そして、リクエストをブロックに分割して relayagent へ送信し、 逆に relayagent から受取ったブロックを 同じセッションID ごとに連結して、 http レスポンスとしてブラウザへ送信します。

したがって、 relayagent はリレーサーバから受取ったブロックを 同じセッションID ごとに連結して Webサーバへ中継し、 そのレスポンスをブロックに分割してリレーサーバへ送信すればよいことになります。

同じセッションID ごとに連結して Webサーバへ送信する処理が、 reqWeb です:

sub reqWeb {
    my ($id, $req) = @_;
    if (defined $WebHeap{$id} && $WebHeap{$id}->{connected}) {
        $WebHeap{$id}->{server}->put($req);
    } else {
        POE::Component::Client::TCP->new
            ( RemoteAddress => $WebHost,
              RemotePort    => $WebPort,
              Connected     => sub {
                  $WebHeap{$id} = $_[HEAP];
                  $WebHeap{$id}->{server}->put($req);
              },
              ServerInput   => sub {
                  $WebHeap{$id} = $_[HEAP];
                  &sendRes($id, $_[ARG0]);
              },
              Filter        => POE::Filter::Stream->new(),
              Disconnected  => sub {
                  &sendControl($id, 0);
              },
            );
    }
}

「POE::Component::Client::TCP->new」によって、 Webサーバと通信するための POEセッションを生成しています。 この reqWeb を実行しているのは、 リレーサーバとの通信を受け持つ POEセッションでしたが、 この POEセッションが新たに POEセッションを生成している点に注意してください。

新しく生成した POEセッションは、Webサーバと接続したとき (Connected)、
「$WebHeap{$id}->{server}->put($req);」を実行して リクエスト ($req) を Webサーバに送信します。 そして Webサーバからレスポンスを受信したとき (ServerInput)、 sendRes を実行します。

sub sendRes {
    my ($id, $res) = @_;
    $IdleTimer = $IdleTimerMax;
    if (defined $PollHeap && $PollHeap->{connected}) {
        for my $block (unpack "(a2048)*", $res) {
            $PollHeap->{server}->
                put(pack("nna*", $id, length($block), $block));
        }
    }
}

sendRes は Webサーバからのレスポンス ($res) を 2048バイトごとに分割し、 セッションID ($id) とデータ長 (length($block)) を付加した ブロックとしてリレーサーバに送信します。

以上をまとめたのが、relayagent スクリプト です。 ここで解説した機能の他、 http リクエストヘッダの Host: フィールドを書き換える機能も追加しています。

C 版の relayagent に比べると、 http レスポンスの書き換え機能や、 http 以外のプロトコルを通す機能などがない点や、 高負荷時の性能の検証が充分行なえていない点など、 そのまま実運用に使用するには難しい点もありそうですが、 少なくとも プロトタイピングなどの目的 (あるいは教育などの目的) ならば 充分使えそうです。

Filed under: システム構築・運用,プログラミングと開発環境 — hiroaki_sengoku @ 07:13

No Comments »

No comments yet.

RSS feed for comments on this post.

Leave a comment