Spark on EMR で 1000 Genomes Project のデータを触る
なんやかんやで年始から記事書いてないですが, 外部要因がないとブログを書かないようになっている. "Distributed computing (Apache Hadoop, Spark, ...) Advent Calendar 2016" の 12/11 担当として何かを書きます.
1000 Genomes Project public data set on S3
AWS S3 には無造作にいろんなびっぐなでーたが置いてあり,
その中に 1000 Genomes Project のデータがあります: 1000 Genomes Project and AWS.
1000 Genomes Project については公式ページ 1000 Genomes | A Deep Catalog of Human Genetic Variation などを参照して頂ければと思いますが, 複数民族をソースとして匿名の 1000 人以上のゲノム配列を決定しよう, というプロジェクトで既に完了しています. なんで 1000 人も要るかって言うとヒトのみならず生物ってのは同じ種でも個体ごとに微妙にゲノム配列に差があってその差がいわゆる "体質" とか "病気のかかりやすさ" 的な差を生み出しているからで, ヒトの標準的なゲノムは何なのか, ということを知る意味でもある程度の N が必要だからです. ともあれ, そのデータは匿名化された上で公開されており, S3 上にも上がっているというわけです.
aws cli が使えるなら以下のコマンドですぐアクセスできます.
$ aws s3 ls s3://1000genomes PRE alignment_indices/ PRE changelog_details/ PRE complete_genomics_indices/ PRE data/ PRE hgsv_sv_discovery/ PRE phase1/ PRE phase3/ PRE pilot_data/ PRE release/ PRE sequence_indices/ PRE technical/ 2015-09-09 06:16:09 1663 20131219.populations.tsv 2015-09-09 06:17:01 97 20131219.superpopulations.tsv 2015-09-09 00:01:44 257098 CHANGELOG 2014-09-03 00:39:53 15977 README.alignment_data 2014-01-30 20:13:29 5289 README.analysis_history 2014-01-31 12:44:08 5967 README.complete_genomics_data 2014-08-29 09:22:38 563 README.crams 2013-08-07 01:11:58 935 README.ebi_aspera_info 2013-08-07 01:11:58 8408 README.ftp_structure 2014-09-03 06:19:43 2082 README.pilot_data 2014-09-03 21:33:15 1938 README.populations 2013-08-07 01:11:58 7857 README.sequence_data 2015-06-19 03:28:31 672 README_missing_files_20150612 2015-06-04 04:43:32 136 README_phase3_alignments_sequence_20150526 2015-06-19 01:34:45 273 README_phase3_data_move_20150612 2014-09-03 21:34:30 3579471 alignment.index 2014-09-03 21:32:59 54743580 analysis.sequence.index 2014-09-03 21:34:57 3549051 exome.alignment.index 2014-09-03 21:35:15 67069489 sequence.index
Adam
今回は s3://1000genomes/release/
配下の多型データを使いますが, これは Variant Call Format (VCF) という形式で保存されておりこのままでは Spark で扱うことが出来ません. そこでマエショリが必要になってくるのですが, Spark でゲノムデータ解析をする際は Adam というパッケージがデファクトスタンダードっぽくなっているので, 以下それを使って Parquet ファイル (相当) に変換します.
EMR クラスタ (Release label: emr-5.1.0 *1 ) を立て, Adam をビルドします. maven, sbt は適当に入れているものとします. EMR 5.1.0 は Spark 2.0.1 がバンドルされており Scala のバージョンは 2.11.8 なので, ./scripts/
配下の適切なスクリプトを流す.
$ git clone https://github.com/bigdatagenomics/adam.git ~/adam $ cd !$ $ ./scripts/move_to_scala_2.11.sh $ ./scripts/move_to_spark_2.sh $ MAVEN_OPTS="-Xmx512m -XX:MaxPermSize=256m" mvn clean package -DskipTests
モノがビルドされると, spark-shell, spark-submit を拡張した adam-shell, adam-submit なるコマンドが使えるようになります.
[hadoop@ip-172-31-12-100 adam]$ ./bin/adam-shell Using SPARK_SHELL=/usr/bin/spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/12/11 13:57:32 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 16/12/11 13:57:41 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://172.31.12.100:4040 Spark context available as 'sc' (master = yarn, app id = application_1479477268424_0036). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111) Type in expressions to have them evaluated. Type :help for more information. scala>
VCF to "Adam Format" (Parquet に毛の生えたフォーマット)
adam-submit のサブコマンドに vcf2adam
というものがあり, これを噛ませることで VCF ファイルを ADAM Format に変換します. 変換元に先述の S3 1000genomes バケットを指定し, 出力先として自分の手持ちのバケットを指定する感じです. ゲノムデータは染色体単位で分かれて記録されているので, とりあえずトリソミーになるとダウン症を発症する 21 番染色体を選んで変換してみようと思いますが, まずは手元に落として, 少し VCF ファイルを覗いてみます.
$ aws s3 cp s3://1000genomes/release/20130502/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz /home/hadoop/ // 冒頭部分 $ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 6 ##fileformat=VCFv4.1 ##FILTER=<ID=PASS,Description="All filters passed"> ##fileDate=20150218 ##reference=ftp://ftp.1000genomes.ebi.ac.uk//vol1/ftp/technical/reference/phase2_reference_assembly_sequence/hs37d5.fa.gz ##source=1000GenomesPhase3Pipeline ##contig=<ID=1,assembly=b37,length=249250621> // 行き来しながら探したところ 253 行目がヘッダーの模様. $ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 253 | tail -n 1 #CHROM POS ID REF ALT QUAL FILTER INFO FORMAT HG00096 HG00097 HG00099 HG00100 HG00101 HG00102 HG00103 HG00105 HG00106 HG00107 HG00108 HG00109 HG00110 HG00111 HG00112 HG00113 HG00114 HG00115 HG00116 HG00117 HG00118 ... (省略 // データの中身. $ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 1000 | tail -n 1 | head -c 200 21 9440889 rs574517239 A C 100 PASS AC=1;AF=0.000199681;AN=5008;NS=2504;DP=13457;EAS_AF=0;AMR_AF=0;AFR_AF=0.0008;EUR_AF=0;SAS_AF=0;AA=.|||;VT=SNP GT 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 0|0 ... (省略
では, adam-submit 叩きます. adam-submit コマンドは spark 用のオプションと adam オプションの間に --
を入れる必要があります. ちょっとハマりました.
$ ./bin/adam-submit \ --deploy-mode cluster \ --master yarn \ --num-executors 10 \ --executor-memory 20G \ -- \ vcf2adam \ s3://1000genomes/release/20130502/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz \ s3://hash-genome-work/chr21.v5/
行けましたね. コアノード 2 台でやっていましたが, この辺で処理が遅いことに気が付き金で殴るべく m3.2xlarge x 10 台に増設しています. そのスペックで 21 番染色体の VCF の Adam フォーマット変換処理に要する時間が 1.5hr 程度でした.
結果.
$ aws s3 ls --recursive s3://hash-genome-work/chr21.v5 2016-12-12 02:39:55 0 chr21.v5/_SUCCESS 2016-12-12 02:39:56 11895 chr21.v5/_common_metadata 2016-12-12 02:39:56 5548 chr21.v5/_header 2016-12-12 02:39:56 29687 chr21.v5/_metadata 2016-12-12 02:39:56 28835 chr21.v5/_samples.avro 2016-12-12 02:39:56 3466 chr21.v5/_seqdict.avro 2016-12-12 02:39:47 119661147 chr21.v5/part-r-00000.gz.parquet 2016-12-12 02:39:50 117920329 chr21.v5/part-r-00001.gz.parquet 2016-12-12 02:39:52 117938957 chr21.v5/part-r-00002.gz.parquet 2016-12-12 02:39:54 24165424 chr21.v5/part-r-00003.gz.parquet 2016-12-12 01:08:37 0 chr21.v5_$folder$
Parquet 化された多型データを adam-shell から参照
adam-shell で色々見てみたいのですが, AdamContext だと S3 からは直接ロードが出来ず HDFS に置く必要があるっぽいので先に HDFS に書き出します.
$ aws s3 cp --recursive s3://hash-genome-work/chr21.v5 /home/hadoop/chr21.parquet $ hadoop fs -mkdir /user/hadoop/ch21 $ hadoop fs -put /home/hadoop/chr21.parquet/*.parquet /user/hadoop/ch21/ $ hadoop fs -ls /user/hadoop/ Found 2 items drwxr-xr-x - hadoop hadoop 0 2016-12-11 19:18 /user/hadoop/.sparkStaging drwxr-xr-x - hadoop hadoop 0 2016-12-11 19:35 /user/hadoop/ch21 $ hadoop fs -ls /user/hadoop/ch21/ Found 4 items -rw-r--r-- 3 hadoop hadoop 119661147 2016-12-11 19:35 /user/hadoop/ch21/part-r-00000.gz.parquet -rw-r--r-- 3 hadoop hadoop 117920329 2016-12-11 19:35 /user/hadoop/ch21/part-r-00001.gz.parquet -rw-r--r-- 3 hadoop hadoop 117938957 2016-12-11 19:35 /user/hadoop/ch21/part-r-00002.gz.parquet -rw-r--r-- 3 hadoop hadoop 24165424 2016-12-11 19:35 /user/hadoop/ch21/part-r-00003.gz.parquet
さて, ようやく Adam Shell からデータを読めそうです.
$ ./bin/adam-shell scala> val data = spark.read.parquet("/user/hadoop/ch21/*.parquet") data: org.apache.spark.sql.DataFrame = [variant: struct<contigName: string, start: bigint ... 8 more fields>, contigName: string ... 20 more fields] scala> data.count res13: Long = 2785018912
約 27.8 億レコードと, 21 番染色体の variant だけでけっこうなデータ量. スキーマ見てみます.
scala> data.first res14: org.apache.spark.sql.Row = [[null,null,null,WrappedArray(rs559462325),G,A,true,true,WrappedArray(),false],21,9411238,9411239,[true,true,WrappedArray(),null,null,null,null,null,null,null,WrappedArray(),WrappedArray(),null,null,Map()],HG00096,null,null,WrappedArray(REF, REF),null,null,null,null,null,null,WrappedArray(),WrappedArray(),WrappedArray(),false,true,null,null] scala> data.printSchema root |-- variant: struct (nullable = true) | |-- contigName: string (nullable = true) | |-- start: long (nullable = true) | |-- end: long (nullable = true) | |-- names: array (nullable = true) | | |-- element: string (containsNull = true) | |-- referenceAllele: string (nullable = true) | |-- alternateAllele: string (nullable = true) | |-- filtersApplied: boolean (nullable = true) | |-- filtersPassed: boolean (nullable = true) | |-- filtersFailed: array (nullable = true) | | |-- element: string (containsNull = true) | |-- somatic: boolean (nullable = true) |-- contigName: string (nullable = true) |-- start: long (nullable = true) |-- end: long (nullable = true) |-- variantCallingAnnotations: struct (nullable = true) | |-- filtersApplied: boolean (nullable = true) | |-- filtersPassed: boolean (nullable = true) | |-- filtersFailed: array (nullable = true) | | |-- element: string (containsNull = true) | |-- downsampled: boolean (nullable = true) | |-- baseQRankSum: float (nullable = true) | |-- fisherStrandBiasPValue: float (nullable = true) | |-- rmsMapQ: float (nullable = true) | |-- mapq0Reads: integer (nullable = true) | |-- mqRankSum: float (nullable = true) | |-- readPositionRankSum: float (nullable = true) | |-- genotypePriors: array (nullable = true) | | |-- element: float (containsNull = true) | |-- genotypePosteriors: array (nullable = true) | | |-- element: float (containsNull = true) | |-- vqslod: float (nullable = true) | |-- culprit: string (nullable = true) | |-- attributes: map (nullable = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- sampleId: string (nullable = true) |-- sampleDescription: string (nullable = true) |-- processingDescription: string (nullable = true) |-- alleles: array (nullable = true) | |-- element: string (containsNull = true) |-- expectedAlleleDosage: float (nullable = true) |-- referenceReadDepth: integer (nullable = true) |-- alternateReadDepth: integer (nullable = true) |-- readDepth: integer (nullable = true) |-- minReadDepth: integer (nullable = true) |-- genotypeQuality: integer (nullable = true) |-- genotypeLikelihoods: array (nullable = true) | |-- element: float (containsNull = true) |-- nonReferenceLikelihoods: array (nullable = true) | |-- element: float (containsNull = true) |-- strandBiasComponents: array (nullable = true) | |-- element: integer (containsNull = true) |-- splitFromMultiAllelic: boolean (nullable = true) |-- phased: boolean (nullable = true) |-- phaseSetId: integer (nullable = true) |-- phaseQuality: integer (nullable = true)
わお. いくつか取ってみます.
scala> data.first.get(1) // contigName. このデータでは染色体番号が入ってるので 21 res15: Any = 21 scala> data.first.get(0) res17: Any = [null,null,null,WrappedArray(rs559462325),G,A,true,true,WrappedArray(),false] // variant の情報. referenceAllele = G で alternateAllele = A なので, // リファレンス配列では G のところが A になってる変異のレコード, ということになる scala> data.first.get(5) res21: Any = HG00096 // sampleId.
sampleId HG00096 について少し調べてみると, 年齢不詳イギリス人男性の B 細胞からサンプリングされたものであることがわかる.
なにか面白い解析が出来るとよかったのだけどぱっと思いつかなかったので一旦ここまでで公開. 何か思いついたら追記するかも.
とりあえず一発でアクセス可能な場所 (S3) に世界的な科学研究の成果が転がっていて, データを使ってあれこれ弄れる形まで持って行くのは案外簡単ですよ, というあたりが伝われば幸いです.
*1:最新版は EMR 5.2.0 ですが手元に立ってたのが 5.1.0 だったので...