Software Engineer の覚書

日々仕事などで調べたこと・考えたことを綴っていきます。

PyCon APAC 2019 で "Building an Analytics Workflow using Airflow" について発表しました

表題の通り、2019/2/23/〜24に開催されたPyCon APAC 2019にてAirflowを使ったアナリティクスワークフローについて発表してきました。

www.youtube.com

www.slideshare.net

PyCon APACとは

PyCon APACは、PythonユーザコミュニティによるPythonに関するカンファレンスで、その名の通り、毎年Asia Pacificのいずれかの国で開催されています。 元々は、PyCon Singaporeの方が始めたこともあり、シンガポールでの開催が多いようです。2018年もシンガポールで開催、2019年は初のフィリピン・マニラでの開催でした。

PyConでは、言語自体のセッションはもとより、Webだけでなく、最近ではデータサイエンスやデータエンジニアリングの分野でPythonが活用されていることもあり、データ関係のセッションが増えているようです。

参加の理由

ここ1年半ほどはデータエンジニアリングの仕事をしていて、PySparkでのデータ処理やAirflowでのデータ分析用のワークフロー構築など、主にPythonを活用している機会が多かったため、いつかPyConで発表してみようと思っていました。

また、半年前くらいからSingaporeで働いているため、東南アジアでPyConが開催されておりには発表を申し込んでやろうと思っていたところ、ちょうどマニラでの開催で決まったので応募しました。マニラはシンガポールから4時間ほどと、まあそれほど近くはないですが、日本から行くよりは近いので^^; 応募してみました。

初マニラの感想

実は、マニラに行く前は、東南アジアで行ったことあるのはシンガポールだけだったので、嫁さんからは「お前は本当の東南アジアを知らない」とバカにされていましたが、マニラに来て圧倒されました。 ものすごい大渋滞、ビジネス街ですらあちこち道が壊れてていて、ボロボロの家があったり、やはり日本とは全く違う雰囲気です。

しかし、フィリピン自体は大手ベンダー(欧米系やら東南アジアなど)が開発拠点を作るなど、東南アジアでのソフトウェアエンジニアの仕事のアウトソース先としてかなり発展してきているそうです。500人近くの参加者の中で、地元の人が過半数。皆、若くて、これから頑張ってやろうと野心にみなぎっていました。まずは、いい仕事について、シンガポールなど発展した仕事を見つけるため、やる気満々です。この辺は成熟した日本とは少し違うかなと思いました。

発表内容 "Building an Analytics Workflow using Airflow"

発表では、以下を紹介しました。要はここ1年くらい私のチームが辿った軌跡を良かったこと、悪かったことをふまえなぞり、今ならComposer1択ですとおすすめして締めました。発表中には1個だけ質問もらいましたが、降壇した後も4-5人から質問されて、Airflowの盛り上がりを感じると共に発表してよかったなと思いました。

  • データ分析用のワークフローの定義や簡単な例
  • Airflowの概要と使い方
  • プロダクションで使う際のAirflowクラスタの構築と課題
  • Composer(最近、GCPがリリースしたマネージドのAirflowサービス)の概要と簡単な使い方

発表に至った経緯を説明すると、私の所属するチームでは、Python使いが多いため、Pythonでワークフローを書け、どこの環境でも動かせるOSSということで、Airflowを選び、1年以上プロダクションでも活用しています。私も1年ほどAirflow職人をやっていますが、非常に使い勝手がよく、良い選択をしたと満足感があります。

ただし、やはりクラスタを管理するにはインフラ寄りのスキルセットを持った人の時間をインフラ管理に投資する必要があります。しかし、最近ではインフラ管理よりもビジネス価値に直結するアプリ寄りに時間をさく方が良いと考え、マネージドサービス(Composer)を活用しています。やはり生産性が全く違うので、非常に満足しています。

次はPyCon Singapore 2019!

PyCon SG 2019

次は地元シンガポールでPyCon Singapore 2019が2019年10月に開催されます。今回の発表でのフィードバックを受け、いくつか改良点(Sparkを使う方法など)を加えて、再度、発表を申し込もうと思います。

あるいは、最近、Apache Beam(Dataflow)も活用しているので、こちらのサービスのPythonでの利用について発表するのも良いかなと思います。

以上、続報をご期待ください。

Schema Registryを使ったKafkaにおけるAvro Schemaの管理について  

最近、仕事でSchema Registryの導入を進めているので、調べたことなどを記しておきたいと思います。

Schema Registryとは

www.confluent.io

Schema Registry stores a versioned history of all schemas and allows the evolution of schemas according to the configured compatibility settings. It also provides a plugin to clients that handles schema storage and retrieval for messages that are sent in Avro format.

Schema Registry とは、ここに書いている通り、スキーマのバージョン管理を行うツールです。Kafka のクライアント間でメッセージを Avro Schema でシリアライズして送信する際に、スキーマを Schema Registry に登録しておくことで、送信側と受信側が同じスキーマを利用していることを保証することができます。

https://www.confluent.io/wp-content/uploads/dwg_SchemaReg_howitworks.png

仕組みは次の通り。

  • まず、メッセージを送信する側が送りたいメッセージに対応したスキーマが存在するかどうか Schema Registry に問い合わせします。存在する場合は、レジストリが送信側に該当スキーマを渡します。
  • 次に送信側はスキーマを使ってメッセージをシリアライズして、ブローカに送信します。
  • 受信側もメッセージに合ったスキーマが Schema Registry に登録されていることを確認し、あればレジストリから取得し、メッセージのでデシリアライズに利用します。
1. The serializer places a call to the schema registry, to see if it has a format for the data the application wants to publish. If it does, schema registry passes that format to the application’s serializer, which uses it to filter out incorrectly formatted messages.
2. After checking the schema is authorized, it’s automatically serialized and there’s no effort you need to put into it. The message will, as expected, be delivered to the Kafka topic. 
3. Your consumers will handle deserialization, making sure your data pipeline can quickly evolve and continue to have clean data. You simply need to have all applications call the schema registry when publishing.

Schema Registry 利用のメリット

www.confluent.io

Schema Registry を利用することにはいくつかのメリットがあります。以下にある通り、

  • スキーマの管理:同じスキーマに対して複数のバージョンを管理でき、バージョン間の互換性を検証できる。

  • スキーマへのアクセス: Schema Registry 自体は REST API サーバであり、REST API を通じてスキーマの登録・取得を行う手段を提供する。

  • 自動的なシリアライゼーション: Kafkaのクライアントには、メッセージを自動的に Avro Schema に変換する機能があり、スキーマと互換性のないメッセージを送る場合は例外を発生させることができる。

Manage your Schema
Schemas are named and you can have multiple versions of the same schema. Schema Registry validates compatibility and warns about possible issues. This allows developers to add and remove fields independently to move faster. New schemas and versions are automatically registered and automatically validated, making the pushing to production process seamless.

Integrate with Standard Development Tools
To help you find out about compatibility issues as early as possible, Schema Registry includes Maven plugins, which enables integration of schema management and validation right into the development process.

Access your Schema
Schema Registry provides a REST API that allows any application to integrate and save or retrieve schemas. Additionally, formatters provide command line functionality for automatically converting JSON messages to make your data human friendly.

Provide Automated Serialization
Confluent Schema Registry is built right into Kafka Client serialization libraries—in every language. Writing serialized Apache Avro records to Kafka is as simple as configuring a producer with the Schema Registry serializers and sending Avro objects to Kafka. And if you use incompatible schema accidentally? That’s just an exception for the producer to handle. Incompatible data will never make it into Apache Kafka.

(補足)そもそも Avro Schema とは

Schema Registry — Confluent Platform

上記では、Schema Registry が Avro Schema のバージョン管理に使われると書きましたが、そもそも Avro Schema とはKafkaを通じて送信するデータのスキーマを記述する言語であり、JSONが厳密なフォーマットに欠けていることで発生する問題を解決するために登場しました。

(例)

  • メッセージのスキーマが変更されることはよく往々にしたるが、送信側がスキーマを変更した際に受信側が追従できないケースが多い。
  • JSONの記述方式が冗長であり、オーバヘッドが大きい。
Data consumers may not understand data producers: The lack of structure makes consuming data in these formats more challenging because fields can be arbitrarily added or removed, and data can even be corrupted. This drawback becomes more severe the more applications or teams across an organization begin consuming a data feed: if an upstream team can make arbitrary changes to the data format at their discretion, then it becomes very difficult to ensure that all downstream consumers will (continue to) be able to interpret the data. What's missing is a "contract" (cf. schema below) for data between the producers and the consumers, similar to the contract of an API.

Overhead and verbosity: They are verbose because field names and type information have to be explicitly represented in the serialized format, despite the fact that are identical across all messages.

KafkaにおいてデータをJSONで送る場合、単純にJSONの企画に沿った文字列に変換して送付します。一方、Avro Schema を利用する場合、事前に以下のようにメッセージのスキーマJSON形式で記述します。そして、送信側がメッセージを送る際に、このスキーマに基づいて、メッセージをより効率的なバイナリデータに変換してから送信します。受信側もこのスキーマを利用してメッセージをデシリアライズします。

{"namespace": "example.avro",
 "type": "record",
 "name": "user",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": "int"}
 ]
}

なお、Avro Schema は Kafka を含む Hadoop コミュニティの中で登場したバイナリフォーマットであり、Hadoop コミュニティでは頻繁に使われていますが、これだけが唯一のバイナリフォーマットではないです。現に、uberのエンジニアは、検証の結果、MessagePack というバイナリフォーマットと圧縮方式 zlib を組み合わせるのが一番効率が良いという記事を書いています。

eng.uber.com

Schema Registry の基本的な使い方

https://docs.confluent.io/current/schema-registry/using.html#common-sr-usage-examples

前述の通り、Schema Registry は REST API を提供しており、APIを通じてスキーマを登録したり、取得したりできます。

# Registering a New Version of a Schema Under the Subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\": \"string\"}"}' \
  http://localhost:8081/subjects/Kafka-key/versions
{"id":1}
# Listing All Subjects
$ curl -X GET http://localhost:8081/subjects
["Kafka-value","Kafka-key"]

スキーマは、subject name という識別子を指定して登録します。subject name は、デフォルトでは、以下の通り、トピックを使います。ただし、後述のリンクにある通り、subject name とスキーマの関係を決めるルールは変更可能です(ただし、自分で実装する必要あり)。

Schema Registry Serializer and Formatter — Confluent Platform

Java コードでのシリアライゼーション

https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer

  • まず、Schema Registry を使う場合は、Producer / Consumer のプロパティ schema.registry.url として Schema Registry の URL を設定する必要があります。
  • もっとも基本的なシリアライゼーション方法は、GenericRecord を使うことですが、見ての通り、ちょっとプリミティブすぎて使い勝手が悪そうです。
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                    "\"name\":\"myrecord\"," +
                    "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
 
ProducerRecord<Object, Object> record = new ProducerRecord<>("topic1", key, avroRecord);
try {
  producer.send(record);
} catch(SerializationException e) {
  // may need to do something with it
}

Java Object からの自動シリアライゼーション

代替策として、以下のライブラリを使うことで、コンパイル時に Avro Schema からメッセージの構造を表現した Java クラスを自動生成できます。

事前にスキーマを resource ディレクトリに置いておく必要があるという面倒はありますが、かなり自然に実装できる点はかなり嬉しいと思います。

final String orderId = "id" + Long.toString(i);
final Payment payment = new Payment(orderId, 1000.00d);
final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(TOPIC, payment.getId().toString(), payment);
producer.send(record);

以下のリンクにmavenのライブラリを使ったサンプルコードがあるので、ご興味あれば参照してみてください。

https://github.com/confluentinc/examples/tree/5.2.1-post/clients/avro/src/main/java/io/confluent/examples/clients/basicavro

(セキュリティ)Kafka クラスタと Schema Registry 間の通信の TLS 暗号化およびクライアント認証

https://docs.confluent.io/current/kafka/encryption.html#encryption-ssl-schema-registry

Schema Registry はスキーマの保存先として、Kafka を利用します。その際、一クライアントとして、Kafkaクラスタに接続します。もし、Kafka 自体に TLS 暗号化やクライアント認証などを導入している場合は、Kafka クラスタと Schema Registry 間の通信においても同様のセキュリティを導入する必要があります。

TLS暗号化やクライアント認証の方法は、一般的なクライアントと同様です。詳細はリンク先を参照ください。

(セキュリティ)REST API エンドポイントに HTTPS を導入する

Schema Registry の REST API エンドポイントを HTTPS にしたい場合は、以下のURLが参考になります。エンドポイントの定義のほか、keystore / truststore などの設定が必要です。

https://docs.confluent.io/current/schema-registry/security.html#clients-to-schema-registry

(セキュリティ)スキーマアクセスの認可

限られた用途で Kafka と Schema Registry を使う場合は問題ありませんが、不特定多数の人にクラスタを提供する場合は、スキーマが自由に変更される状態では困ると思います(謝って変更してしまったためにシステム障害が起きるなど)。

どのクライアントがどの操作(REST API上のリソース)が可能かは、ACLによって指定できます。これは、Kafkaクラスタ本体のACLとほぼ同じインタフェースになっており、Kafka自体の認可を設定していれば非常に分かりやすいと思います。

なお、当然ですが、認可を導入するには認証できている必要があります。

Schema Registry Authorization — Confluent Platform

Schema Registry ACL Authorizer — Confluent Platform

おわりに

今回は、Schema Registry と Avro Schema について簡単にご紹介しました。 大規模に使い始めると、スキーマを統一的に管理したいというニーズは高まると思います。私自身も会社で共有する Kafka クラスタを運用しており、スキーマを統一的に管理したいなと思い、Schema Registry を調べ始めて、今回のブログを書くに至りました。 もし、Schema Registry を調べている人の参考になれば幸いです。