読者です 読者をやめる 読者になる 読者になる

ミームの死骸を待ちながら

We are built as gene machines and cultured as meme machines, but we have the power to turn against our creators. We, alone on earth, can rebel against the tyranny of the selfish replicators. - Richard Dawkins "Selfish Gene"

We are built as gene machines and cultured as meme machines, but we have the power to turn against our creators.
We, alone on earth, can rebel against the tyranny of the selfish replicators.
- Richard Dawkins "Selfish Gene"

Spark on EMR で 1000 Genomes Project のデータを触る

なんやかんやで年始から記事書いてないですが, 外部要因がないとブログを書かないようになっている. "Distributed computing (Apache Hadoop, Spark, ...) Advent Calendar 2016" の 12/11 担当として何かを書きます.

qiita.com

1000 Genomes Project public data set on S3

AWS S3 には無造作にいろんなびっぐなでーたが置いてあり,

aws.amazon.com

その中に 1000 Genomes Project のデータがあります: 1000 Genomes Project and AWS.

1000 Genomes Project については公式ページ 1000 Genomes | A Deep Catalog of Human Genetic Variation などを参照して頂ければと思いますが, 複数民族をソースとして匿名の 1000 人以上のゲノム配列を決定しよう, というプロジェクトで既に完了しています. なんで 1000 人も要るかって言うとヒトのみならず生物ってのは同じ種でも個体ごとに微妙にゲノム配列に差があってその差がいわゆる "体質" とか "病気のかかりやすさ" 的な差を生み出しているからで, ヒトの標準的なゲノムは何なのか, ということを知る意味でもある程度の N が必要だからです. ともあれ, そのデータは匿名化された上で公開されており, S3 上にも上がっているというわけです.

aws cli が使えるなら以下のコマンドですぐアクセスできます.

$ aws s3 ls s3://1000genomes
                           PRE alignment_indices/
                           PRE changelog_details/
                           PRE complete_genomics_indices/
                           PRE data/
                           PRE hgsv_sv_discovery/
                           PRE phase1/
                           PRE phase3/
                           PRE pilot_data/
                           PRE release/
                           PRE sequence_indices/
                           PRE technical/
2015-09-09 06:16:09       1663 20131219.populations.tsv
2015-09-09 06:17:01         97 20131219.superpopulations.tsv
2015-09-09 00:01:44     257098 CHANGELOG
2014-09-03 00:39:53      15977 README.alignment_data
2014-01-30 20:13:29       5289 README.analysis_history
2014-01-31 12:44:08       5967 README.complete_genomics_data
2014-08-29 09:22:38        563 README.crams
2013-08-07 01:11:58        935 README.ebi_aspera_info
2013-08-07 01:11:58       8408 README.ftp_structure
2014-09-03 06:19:43       2082 README.pilot_data
2014-09-03 21:33:15       1938 README.populations
2013-08-07 01:11:58       7857 README.sequence_data
2015-06-19 03:28:31        672 README_missing_files_20150612
2015-06-04 04:43:32        136 README_phase3_alignments_sequence_20150526
2015-06-19 01:34:45        273 README_phase3_data_move_20150612
2014-09-03 21:34:30    3579471 alignment.index
2014-09-03 21:32:59   54743580 analysis.sequence.index
2014-09-03 21:34:57    3549051 exome.alignment.index
2014-09-03 21:35:15   67069489 sequence.index

Adam

今回は s3://1000genomes/release/ 配下の多型データを使いますが, これは Variant Call Format (VCF) という形式で保存されておりこのままでは Spark で扱うことが出来ません. そこでマエショリが必要になってくるのですが, Spark でゲノムデータ解析をする際は Adam というパッケージがデファクトスタンダードっぽくなっているので, 以下それを使って Parquet ファイル (相当) に変換します.

GitHub - bigdatagenomics/adam: ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark and Parquet. Apache 2 licensed.

EMR クラスタ (Release label: emr-5.1.0 *1 ) を立て, Adam をビルドします. maven, sbt は適当に入れているものとします. EMR 5.1.0 は Spark 2.0.1 がバンドルされており Scala のバージョンは 2.11.8 なので, ./scripts/ 配下の適切なスクリプトを流す.

$ git clone https://github.com/bigdatagenomics/adam.git ~/adam
$ cd !$
$ ./scripts/move_to_scala_2.11.sh
$ ./scripts/move_to_spark_2.sh
$ MAVEN_OPTS="-Xmx512m -XX:MaxPermSize=256m" mvn clean package -DskipTests

モノがビルドされると, spark-shell, spark-submit を拡張した adam-shell, adam-submit なるコマンドが使えるようになります.

[hadoop@ip-172-31-12-100 adam]$ ./bin/adam-shell
Using SPARK_SHELL=/usr/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/11 13:57:32 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/12/11 13:57:41 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.31.12.100:4040
Spark context available as 'sc' (master = yarn, app id = application_1479477268424_0036).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

VCF to "Adam Format" (Parquet に毛の生えたフォーマット)

adam-submit のサブコマンドに vcf2adam というものがあり, これを噛ませることで VCF ファイルを ADAM Format に変換します. 変換元に先述の S3 1000genomes バケットを指定し, 出力先として自分の手持ちのバケットを指定する感じです. ゲノムデータは染色体単位で分かれて記録されているので, とりあえずトリソミーになるとダウン症を発症する 21 番染色体を選んで変換してみようと思いますが, まずは手元に落として, 少し VCF ファイルを覗いてみます.

$ aws s3 cp s3://1000genomes/release/20130502/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz
 /home/hadoop/

// 冒頭部分
$ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 6
##fileformat=VCFv4.1
##FILTER=<ID=PASS,Description="All filters passed">
##fileDate=20150218
##reference=ftp://ftp.1000genomes.ebi.ac.uk//vol1/ftp/technical/reference/phase2_reference_assembly_sequence/hs37d5.fa.gz
##source=1000GenomesPhase3Pipeline
##contig=<ID=1,assembly=b37,length=249250621>

// 行き来しながら探したところ 253 行目がヘッダーの模様.
$ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 253 | tail -n 1
#CHROM  POS     ID      REF     ALT     QUAL    FILTER  INFO    FORMAT  HG00096 HG00097 HG00099 HG00100 HG00101 HG00102 HG00103 HG00105 HG00106 HG00107      HG00108 HG00109 HG00110 HG00111 HG00112 HG00113 HG00114 HG00115 HG00116 HG00117 HG00118 ... (省略

// データの中身.
$ zcat ~/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz | head -n 1000 | tail -n 1 | head -c 200
21      9440889 rs574517239     A       C       100     PASS    AC=1;AF=0.000199681;AN=5008;NS=2504;DP=13457;EAS_AF=0;AMR_AF=0;AFR_AF=0.0008;EUR_AF=0;SAS_AF=0;AA=.|||;VT=SNP        GT      0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0     0|0 ... (省略

では, adam-submit 叩きます. adam-submit コマンドは spark 用のオプションと adam オプションの間に -- を入れる必要があります. ちょっとハマりました.

$ ./bin/adam-submit \
--deploy-mode cluster \
--master yarn \
--num-executors 10 \
--executor-memory 20G \
-- \
vcf2adam \
s3://1000genomes/release/20130502/ALL.chr21.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz \
s3://hash-genome-work/chr21.v5/

行けましたね. コアノード 2 台でやっていましたが, この辺で処理が遅いことに気が付き金で殴るべく m3.2xlarge x 10 台に増設しています. そのスペックで 21 番染色体の VCF の Adam フォーマット変換処理に要する時間が 1.5hr 程度でした.

結果.

$ aws s3 ls --recursive s3://hash-genome-work/chr21.v5
2016-12-12 02:39:55          0 chr21.v5/_SUCCESS
2016-12-12 02:39:56      11895 chr21.v5/_common_metadata
2016-12-12 02:39:56       5548 chr21.v5/_header
2016-12-12 02:39:56      29687 chr21.v5/_metadata
2016-12-12 02:39:56      28835 chr21.v5/_samples.avro
2016-12-12 02:39:56       3466 chr21.v5/_seqdict.avro
2016-12-12 02:39:47  119661147 chr21.v5/part-r-00000.gz.parquet
2016-12-12 02:39:50  117920329 chr21.v5/part-r-00001.gz.parquet
2016-12-12 02:39:52  117938957 chr21.v5/part-r-00002.gz.parquet
2016-12-12 02:39:54   24165424 chr21.v5/part-r-00003.gz.parquet
2016-12-12 01:08:37          0 chr21.v5_$folder$

Parquet 化された多型データを adam-shell から参照

adam-shell で色々見てみたいのですが, AdamContext だと S3 からは直接ロードが出来ず HDFS に置く必要があるっぽいので先に HDFS に書き出します.

$ aws s3 cp --recursive s3://hash-genome-work/chr21.v5 /home/hadoop/chr21.parquet
$ hadoop fs -mkdir /user/hadoop/ch21
$ hadoop fs -put /home/hadoop/chr21.parquet/*.parquet /user/hadoop/ch21/

$ hadoop fs -ls /user/hadoop/
Found 2 items
drwxr-xr-x   - hadoop hadoop          0 2016-12-11 19:18 /user/hadoop/.sparkStaging
drwxr-xr-x   - hadoop hadoop          0 2016-12-11 19:35 /user/hadoop/ch21

$ hadoop fs -ls /user/hadoop/ch21/
Found 4 items
-rw-r--r--   3 hadoop hadoop  119661147 2016-12-11 19:35 /user/hadoop/ch21/part-r-00000.gz.parquet
-rw-r--r--   3 hadoop hadoop  117920329 2016-12-11 19:35 /user/hadoop/ch21/part-r-00001.gz.parquet
-rw-r--r--   3 hadoop hadoop  117938957 2016-12-11 19:35 /user/hadoop/ch21/part-r-00002.gz.parquet
-rw-r--r--   3 hadoop hadoop   24165424 2016-12-11 19:35 /user/hadoop/ch21/part-r-00003.gz.parquet

さて, ようやく Adam Shell からデータを読めそうです.

$ ./bin/adam-shell

scala> val data = spark.read.parquet("/user/hadoop/ch21/*.parquet")
data: org.apache.spark.sql.DataFrame = [variant: struct<contigName: string, start: bigint ... 8 more fields>, contigName: string ... 20 more fields]

scala> data.count
res13: Long = 2785018912

約 27.8 億レコードと, 21 番染色体の variant だけでけっこうなデータ量. スキーマ見てみます.

scala> data.first
res14: org.apache.spark.sql.Row = [[null,null,null,WrappedArray(rs559462325),G,A,true,true,WrappedArray(),false],21,9411238,9411239,[true,true,WrappedArray(),null,null,null,null,null,null,null,WrappedArray(),WrappedArray(),null,null,Map()],HG00096,null,null,WrappedArray(REF, REF),null,null,null,null,null,null,WrappedArray(),WrappedArray(),WrappedArray(),false,true,null,null]

scala> data.printSchema
root
 |-- variant: struct (nullable = true)
 |    |-- contigName: string (nullable = true)
 |    |-- start: long (nullable = true)
 |    |-- end: long (nullable = true)
 |    |-- names: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- referenceAllele: string (nullable = true)
 |    |-- alternateAllele: string (nullable = true)
 |    |-- filtersApplied: boolean (nullable = true)
 |    |-- filtersPassed: boolean (nullable = true)
 |    |-- filtersFailed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- somatic: boolean (nullable = true)
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- variantCallingAnnotations: struct (nullable = true)
 |    |-- filtersApplied: boolean (nullable = true)
 |    |-- filtersPassed: boolean (nullable = true)
 |    |-- filtersFailed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- downsampled: boolean (nullable = true)
 |    |-- baseQRankSum: float (nullable = true)
 |    |-- fisherStrandBiasPValue: float (nullable = true)
 |    |-- rmsMapQ: float (nullable = true)
 |    |-- mapq0Reads: integer (nullable = true)
 |    |-- mqRankSum: float (nullable = true)
 |    |-- readPositionRankSum: float (nullable = true)
 |    |-- genotypePriors: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- genotypePosteriors: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |    |-- vqslod: float (nullable = true)
 |    |-- culprit: string (nullable = true)
 |    |-- attributes: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- sampleId: string (nullable = true)
 |-- sampleDescription: string (nullable = true)
 |-- processingDescription: string (nullable = true)
 |-- alleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- expectedAlleleDosage: float (nullable = true)
 |-- referenceReadDepth: integer (nullable = true)
 |-- alternateReadDepth: integer (nullable = true)
 |-- readDepth: integer (nullable = true)
 |-- minReadDepth: integer (nullable = true)
 |-- genotypeQuality: integer (nullable = true)
 |-- genotypeLikelihoods: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- nonReferenceLikelihoods: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- strandBiasComponents: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- phased: boolean (nullable = true)
 |-- phaseSetId: integer (nullable = true)
 |-- phaseQuality: integer (nullable = true)

わお. いくつか取ってみます.

scala> data.first.get(1) // contigName. このデータでは染色体番号が入ってるので 21
res15: Any = 21

scala> data.first.get(0)
res17: Any = [null,null,null,WrappedArray(rs559462325),G,A,true,true,WrappedArray(),false]
// variant の情報. referenceAllele = G で alternateAllele = A なので,
// リファレンス配列では G のところが A になってる変異のレコード, ということになる

scala> data.first.get(5)
res21: Any = HG00096
// sampleId.

sampleId HG00096 について少し調べてみると, 年齢不詳イギリス人男性の B 細胞からサンプリングされたものであることがわかる.

なにか面白い解析が出来るとよかったのだけどぱっと思いつかなかったので一旦ここまでで公開. 何か思いついたら追記するかも.

とりあえず一発でアクセス可能な場所 (S3) に世界的な科学研究の成果が転がっていて, データを使ってあれこれ弄れる形まで持って行くのは案外簡単ですよ, というあたりが伝われば幸いです.

*1:最新版は EMR 5.2.0 ですが手元に立ってたのが 5.1.0 だったので...