取り寄せた本が届いたら DynamoDB Stream を元に Lambda 経由で Twitter に通知させる
人間は図書館を活用することで住民税の元を取ることが出来る.取り寄せ依頼した本が届いたり借りてる本の返却期限が近付いたりすると Twitter bot が通知してくれるようにしている.が,単純に cron で回していたので人間側の対応が遅れると何度も同じことを言われてしまう.
今回「今年もやるよ!AWS Lambda縛り Advent Calendar 2015 - Qiita」の 12/05 分担当として,上記の仕組みをDynamoDB の更新 Stream をイベントソースとして Lambda で一回だけ通知してくれるように改良する話を書く.
前提
図書館の web をスクレイピングして DynamoDB に格納する... ところは Lambda Advent Calendar としては本題ではないので省略*1.
うちの地区はこんな感じのページ.とりあえず人間が
- 本の取り寄せ依頼(予約)をしたとき
- 取り寄せた本が届いた時
- 本を借りた時
- 本を返した時
といった行動をした際に,日付,書籍 ISBN,行動タイプが以下の様な形式で DynamoDB に格納される機能が実装済とお考えください.
{ "date": "2015-12-02", "hashed_title": "7f550dd47d40ea31eb84c092dbcdd2f66ff960db", "isbn": "9784000236218", "type": "borrow", "uuid": "b946c18b-0aa9-45fc-938e-be5b52d1fc33" }
なお書籍データ本体は別の場所に ISBN をハッシュキーとして格納している. ...で,該当テーブルに対して DynamoDB Stream を有効化する.DynamoDB Stream はリリース当初 Preview 版で利用申請が必要だったが今は誰でも使える.
DynamoDB ストリームは、Amazon DynamoDB テーブル内の項目に加えられた変更に関する情報の順序付けされた情報です。テーブルでストリームを有効にすると、DynamoDB はテーブル内のデータ項目に加えられた各変更に関する情報をキャプチャします。 ...
DynamoDB ストリーム は、ストリームレコードをほぼリアルタイムで書き込むため、これらのストリームを使用し、内容に基づいてアクションを実行するアプリケーションを構築できます。
引用元: DynamoDB ストリーム を使用したテーブルアクティビティのキャプチャ - Amazon DynamoDB
DynamoDB マネジメントコンソールからテーブルを選択してぽちぽちやると有効化される.
すると,こんな感じの Stream ARN が得られる.
arn:aws:dynamodb:ap-northeast-1:<MY_AWS_ID>:table/<MY_TABLE_NAME>/stream/<Stream を有効化した時刻>
ARN = Amazon Resource Names は AWS サービスにおいて何らかのリソース (EC2 インスタンス, IAM ユーザ,DynamoDB のテーブル etc) を URI よろしく一意に識別する ID のようなもので,AWS サービス間で何か連携しようとするとよく出てくる.
Lambda で DynamoDB Stream を受け取る
ようやく Lambda.先の操作で得られた Stream ARN をイベントソースとして受け取り発火させる設定を行う. ゼロから書き始めてもいいんだけど,マネジメントコンソールで "Create a Lambda Function" を選択,"dynamodb-process-stream" という blueprint を元にすると話が早い.
console.log('Loading function'); exports.handler = function(event, context) { //console.log('Received event:', JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { console.log(record.eventID); console.log(record.eventName); console.log('DynamoDB Record: %j', record.dynamodb); }); context.succeed("Successfully processed " + event.Records.length + " records."); };
event.Records
の中に Stream の更新情報がどばっと入ってきている.たとえば以下の様な形.
{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { // 更新レコード情報 "Keys": { "uuid": { "S": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" } }, "NewImage": { "date": "2015-12-03", "hashed_title": "7f550dd47d40ea31eb84c092dbcdd2f66ff960db", "isbn": "9784000236218", "type": "return", "uuid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "ap-northeast-1", "eventName": "INSERT", // レコードの新規作成 "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:<MY_AWS_ID>:table/<MY_TABLE_NAME>/stream/<Stream を有効化した時刻>", "eventSource": "aws:dynamodb" } ] }
先の Stream 有効化時に "New and old Images" を選択していると,更新前/更新後のレコードがフルフルで入ってくる.単に「なんか更新合ったよ」という事実を知りたいだけなら "Keys Only" でいいと思う. Stream から取得できる "Records > dynamodb > NewImage ..." あたりの内容を使えば Twitter 通知に必要な情報は取れそう.
Twitter API
今回やりたいことのためには Twitter API を Lambda から叩く必要があるが,その名も twittter という npm package で簡単に実現できる.
var client = new Twitter({ consumer_key: <consumer_key>, consumer_secret: <consumer_secret>, access_token_key: <access_token_key>, access_token_secret: <access_token_secret> }); var msg = "予約していた『" + event.Records[0].dynamodb.NewImage.title + "』が図書館で確保済です. (" + event.Records[0].dynamodb.NewImage.date + "迄)"; client.post('/statuses/update.json', {status: msg}, function(err, ...) { // ... });
こんなイメージのコードを書けば,Lambda から bot を喋らせることが出来る.
ところでちょっと余談気味だけど,consumer_key
とか access_token_secret
とかを zip に含めて Lambda にアップロード,ってのはちょっとせきゅりてぃ的な意味でやりたくない.そこで AWS Key Management Service (KMS) を使うようにした.KMS のコンセプトについてはクラメソさんの 10分でわかる!Key Management Serviceの仕組み #cmdevio | Developers.IO などを参照にするとわかりやすい気がする.
流れとしては,あらかじめ KMS の Master Key を作っておいた後,その key-id を指定して Twitter API の各種 API credentials 情報をカンマで接続したテキストを暗号化.
$ aws kms encrypt --profile my-profile --key-id 9xxxxxx-9999-aaaa-a996-ffffffffffff --plaintext "consumer_key:<my_consumer_key>,consumer_secret:<my_consumer_secret>,access_token:<my_access_token>,access_token_secret:<my_access_token_secret>" --query CiphertextBlob --output text | base64 -D > ./encrypted-credentials
この encrypted-credentials
を zip に含め,ソースコードと一緒に Lambda にあげてやる.
Lambda 上では毎回実行時に KMS へのリクエストを行い,credentials を複合する.Lambda Function に紐付けた Role からのみ復号可能となるように IAM 的に制限することができる,という仕組み.Lambda 上で動作する KMS decrypt 部分のコードは以下.
var encrypted = fs.readFileSync('./encrypted-credentials'); // コードと一緒に上げた暗号化済 credentials 情報 kms.decrypt({CiphertextBlob: encrypted}, function(err, data) { if (err) { console.log(err, err.stack); context.fail('failed'); } else { var decrypted = data['Plaintext'].toString(); // console.log(decrypted); obj = {}; decrypted.split(',').forEach(function(item) { var pair = item.split(':'); obj[pair[0]] = pair[1]; }); // Lambda 上で復号した credentials 情報を使って Twitter API を叩く var client = new Twitter({ consumer_key: obj.consumer_key, consumer_secret: obj.consumer_secret, access_token_key: obj.access_token_key, access_token_secret: obj.access_token_secret }); // ...
いじょ
もう少し DynamoDB Stream + Lambda の汎用的な話にするつもりだったんだけど,ざっくり用例を一つ紹介する記事になってしまった.がんばります.