『データ指向アプリケーションデザイン ――信頼性、拡張性、保守性の高い分散システム設計の原理』Part 3 導出data
Part3 導出data
目的
- 複数の異なるdata systemを結合
→ 一貫したapplication archtectureにすることのproblemを検証
ⅲ-1 Systems of Recordと導出data
- dataを保存し処理するsystemの2分類
- Systems of Record (a.c.a. Source of truth)
- normalized(正規化)
- 導出data system
- cacheなど
- 非正規化 value, index, materialized viewも
- original sourceから再生成可能のもの
- 冗長.but, readのperformanceのために必要
- denormalized
- Systems of Record (a.c.a. Source of truth)
- system内のdata flowの明確のために,区別はvery beneficial
- systemの各部のI/Oと依存関係の明確化
- dbは単なるtool
- → application内でどう使うかで分類が決まる
- dataを保存し処理するsystemの2分類
Ch10 batch processing
目的
- Part 1, 2で,request, query, responce, resultを議論
- 多くのrecent data systemのpreposition
- online system: responce timeが重要
- Web, HTTP/REST baseのAPIで,request/responce styleがgeneral
- ↑→ ほかのapproachにもメリットあり
- 3つのsystem type
- Service(Online System)
- requestとresponce
- responce time, 可用性が重要
- batch processing System(Offline System)
- 大量の入力dataをprocessingするJobをregular実行
- throughput で測る
- stream processing system(near line processing, 準real time system)
- 入力から出力生成.online processingとbatch processingの中間
- not request/responce
- eventはすぐprocessing ←→ batch jobは固定集合に実行
- → latency低い than batch system
- Service(Online System)
- trusted scalable(maintainable) systemのために,batch processingは重要なbuilding block
10.1 Unixのtoolによるbatch processing
目的
- simpleなe.g.
- request, processingのたびにlog fileに行を追記
対応
- !1 simple logのanalyze
- !2 Unixの哲学
- 「programをpipeでつなぐ」
- automation, rapid prototyping, incremental iteration, experiment(実験)への親和性, 大規模projectの管理可能なchunkへの分割
- !!1 一様なinterface
- 全programが同じinterface
- @Unix: file(file descripter) ← orderを持つbyteの並び
- ASCII textのinterfaceはnot beautiful
- e.g. △{print $7}より,〇{print $request-url}
- 結合性が重要
- 全programが同じinterface
- !!2 logicと結線の分離
- stdin, stdout
- 疎結合(loose coupling), late binding, inversion of controlの1形態
- → 入出力の結線をlogicから分離
- → 小さいtoolから大きいsystemを合成
- → 入出力の結線をlogicから分離
- !!3 transparencyとexperiment
10.2 MapReduceとdistributed file system
目的
- MapReduceとUnix toolsは似ている
- MapReduceのJobは,distributed file system上のfileをread/write
- Hadoopでは,HDFS(Hadoop Distributed File System)という,Google File System(GFS)のOpenSourceの再実装
- GlusterFSや,Quantcast File System(QFS)もある
- Amazon S3, Azure Blob Storage, Open Stack SwiftなどのObject Storage Serviceも似ている
- HDFSは,shared nothingの原則に基づく
- ↑→ NAS(Network Attach Storage), SAN(Storage Area Network)のarchtectureでの共有discのapproachと対照的
- customのhardwareや,fiber channelなどの特殊なnetwork infra必要
- ↑→ 特別なhardware不要.datacenter内のnetworkでつながったcomputer群があればOK
- ↑→ NAS(Network Attach Storage), SAN(Storage Area Network)のarchtectureでの共有discのapproachと対照的
- HDFS: network serviceを公開
- name nodeという中央Serverが,file blockingのmachineへの保存状況を追跡
- → 全machineのdisc領域から,1つの巨大なfile systemを生成
- リードソロモンcodeのようなerasure codingで,完全なreplicationによりoverheadを抑える
対応
- !1 MapReduce Jobのexecution
- !2 Reduce側での結合とgroup化
- 結合: {foreign key, document reference, 辺}の両端のrecordにaccessしなければならないcodeで必要
- MapReduceのjobはfull table scan
- analytic queryでは〇
- especially, 複数マシンにわたってconcurr
- → dataset中の何らかの関連をすべて見出す
- analytic queryでは〇
- !!1 事例: userのactivity analyze
- 図10-2: batch jobでの結合の例
- MapReduceを使って関連するrecordsをすべて同じところにまとめ,effectiveにprocessing
- !!2 sortmerge 結合
- 図10-3: flow
- secondary sort: reducer内でさらにsort
- mapperの出力がkeyでsort → reducerがmerge
- 図10-3: flow
- !!3 関連するdataを同じ場所にまとめる
- !!4 GROUP BY
- e.g. sessionization
- mapperがgroup化のためのkeyを使う
- !!5 skewのprocessing
- linchpin object: 不均衡なactive db record
- a.c.a. hot key
- → skew(hot spot)が生じる
- → 対応
- e.g. Pigでのskewed join method
- hot keyを扱うprocessを複数reducerに分散
- → concurr度up
- crunchでのsharded joinも同様だが,explicitなhot keyの指定必要
- hotspotの緩和のためのrandom化と似ている
- Hiveでは別の方法
- map-side join
- hot keyを扱うprocessを複数reducerに分散
- hot keyでのgroup化では,2つのstageに分割
- e.g. Pigでのskewed join method
- linchpin object: 不均衡なactive db record
- !3 map-side join
- joinを高速化
- reducerなし.sortなし
- !!1 broadcast hash join
- 小さい入力が大きな入力のすべてのpartitionにbroadcast
- hash: 小さい入力をinmemory hashtableにloadする
- local disc上のread onlyのindexにstoreも〇
- Pig, Hive, Cascading, Crunchで使える
- Impaleなどの,DWH query engineでも使われている
- !!2 partition化 hash join
- joinのinputがどちらも同じkey, 同じhash functionに基づいて同じ数のpartitionを持っているときのみ使える
- Hiveでのbucketed map join
- e.g. 先行するMapReduce jobでinput data generationのcase
- joinのinputがどちらも同じkey, 同じhash functionに基づいて同じ数のpartitionを持っているときのみ使える
- !!3 map-side merge join
- partition化を同じkeyでsortedのcase
- 先行するManagement jobがproceededのcase
- !!4 map側でのjoinを行うMapReduceのworkflow
- map-side joinでは,distributed file system中のdatasetのphysical なlayoutが重要 ← preposition 名詞のため
- → Hadoopのecosystem中では,これらのmetadataをHCatalogやHiveのmetastoreで管理
- !4 batch workflowのoutput
- batch processingは,OLTPでもOLAPでもない
- !!1 search indexの構築
- 固定されたdocument群への全文検索には,index作成のためのbatch processingはeffective
- !!2 batch processingのoutputとしてのK=V store
- search index以外のe.g. of batch processing workflowのoutput
- classifyerのようなmachine learning systemや,reccommendation systemの構築
- batch processingからのoutputのapplicationが使うDBへの適用方法
- 全く新しいDBをbatch job内部で作成
- Voldemort, Terrapin, ElephantDB, Hoosebulk loadingなどでsupported
- → distributed file system中のjobのoutput directoryにwrite out
- → read onlyのqueryをprocessするServiceへbulk load 〇
- こういったDB fileの構築: MapReduceの良い利用法
- WAL(Write-Ahead Log)も不要
- 全く新しいDBをbatch job内部で作成
- search index以外のe.g. of batch processing workflowのoutput
- !!3batch processingのoutputでの哲学
- Unixの哲学と同じ
- inputはimmuteで副作用を避け,batch jobのperformance up + maintenanceはるかに楽
- human fault toleranceあり ← ↓rollback 容易
- minimizing irreversibility (回復不能性の最小化)
- automatic retryが安全
- 同じfile集合を多様なjobのinputに使える
- logicを結線(in/out directory)からisolation
- 関心の分離. codeのreuse〇
- inputはimmuteで副作用を避け,batch jobのperformance up + maintenanceはるかに楽
- Unixとは異なる点
- Hadoopでは,よりstructuredなfile formatを使い,parse processingを省略可能
- ↑ effectiveなschema baseのencodingを提供
- schemaを進化させられる → Avro, Parquetがよく使われる
- Unixの哲学と同じ
- !5 Hadoopとdistributed datacenterの比較
- Hadoop: Unixのdistributed versionのよう
- !!1 storageの多様性
- Hadoopは,無差別にdataをHDFSに書き込み,後からprocessingの方法を見つければよいという可能性を打ち出した
- ↑→ MPP DBでは,db originalのstorage formatにインポートするため,あらかじめdataとquery patternをmodeling要
- DWHと似た考え.: data を生のままcollect
- 全く異なるdatasetのcollection speed高い: Datalake, enterprise data hub
- DWHと似た考え.: data を生のままcollect
- dataの解釈は,consumerのproblem ← schema on read
- → 任意の変換可能
- → sushi principle: dataは生が〇
- → Hadoopは,ETL(Extract-Transform-Load) processingの実装に使われてきた
- ↑ distributed file systemが任意のformatでのdata encodingをサポート.
- → 任意の変換可能
- !!2 processing modelの多様性
- !!3 often faultに備えた設計
- MapReduce とMPP dbの違い
10.3 MapReduceを超えて
目的
- batch processingのMapReduce以外の選択肢を見る
対応
- !1 中間的なstateの実体化
- 実体化 ←→ streaming(: unixのpipe)
- 実体化のproblem(MapReduceのproblem)
- タスクの待ち
- mapperは冗長
- temporary dataのreplicationは無駄
- !!1 data flow engine
- MapReduceのproblemに対応
- e.g. Spark, Tez, Flink
- workflow全体を1つのjobとして扱う
- 複数stageのdata flowをexplicitにmodel化: data flow engine
- 入力をpartitioning → taskをconcurr
- mapやreduceはなく,operatorを使う
- operatorの接続のchoicesとmerit
- map, reduceと同じ処理のcodeは,修正鳴くほかのengineでも使える
- MapReduceのproblemに対応
- !!2 fault tolerance
- !!3 実体化についての議論
- !2 graphとiterativeなprocessing
- graph全体へのoffline processingやanalyze
- → recommendation engineやranking systemといったmachinelearningのapplicationで生じる
- e.g. PageRank
- data flow wngは,directed acyclic graph(DAG)内にoperatorを置くが,operator間のdataglowのみgraph化していて,data事態はrelational styleのtaple
- 図2-6: transitive slosure(推移的閉包)のようなalgorithm
- → iterativeなstyleで実装
- Managementで実装はnot effective
- !!1 Pregelのprocessing model
- batch processing graphのoptimise: 演算processingのbulk sync parallel (BSP) modelが広がる
- ある頂点がほかの頂点にmessageを送信
- 頂点が自分のstateをiteration内でmemoryに記憶する
- → functionは,新しいmessageのみproceedでOK
- actor modelとの違い 0 頂点のstateと頂点間のmessageにfault-toleranceと永続性あり
- !!2 fault-tolerance
- Pregel jobのperformance高い: messageをbatch化して通信待ち時間削減
- → iteration間でのみ末
- 透過的なfaultからのrecovery
- ↑ iterationの終わりに全頂点のstateのcheckpoint processing ← regularly
- Pregel jobのperformance高い: messageをbatch化して通信待ち時間削減
- !!3 concurrency
- Pregelのprogramming model: 1度に1つの頂点のみ扱う
- graphのalgorithm: 大量マシン間の通信のoverheadあり → distributed graph algorithmのspeed 大きく減少
- → 単一machine上のalgorithm > distributed batch processing @ performance
- GraphChiなどのframeworkを使う単一machineでのprocessingが〇
- Graph algorithmのconcurrencyは研究中
- !3 高レベルAPIと様々なlanguages
- programming modelの改善, processing effectivityの改善
- これらの技術が扱えるproblem領域の拡大に目が向く
- Hive, Pig, Cascading, Crunchといった高レベル languageやAPIの登場
- Tezにより移行〇
- SparkSQL, Flink ← Flume Javaから
- これらのdataflow APIは,relational styleのbuilding blockで演算処理を表現
- code少ない + interactive
- !!1 宣言的なquery languageへの移行
- relationalなoperatorの指定 → frameworkがjoinのinputの性質を分析
- → タスクにとっていずれのjoin algorithmが良いかautomatic judge〇
- e.g. Hive, Spark, Flink
- ↑→ joinを宣言的に指定 → query optimiserがjudge
- ほかのSQLの完全な宣言的query modelとの違い
- relationalなoperatorの指定 → frameworkがjoinのinputの性質を分析
- !!2 various領域への特化
- e.g. 統計,数値処理のalgorithm for classificationやrecommendation systemといったmachine learningのapplication
- reuse〇な実装の出現
- e.g. Mahout, MADlib
- e.g. 空間algorithm
- e.g. k近傍鳳(k-nearest neighbors), 近似(approximate) search for genome analyze
- → algorithmのdistributed executionのために,batch processing engineが使われる領域は広がっている
- batch processing systemの機能や高レベルの宣言的operatorが増え,MPPdb がprogrammingし易く柔軟になることで,似てきている.
- どちらも単なるdataを保存し処理するsystemに過ぎない
- e.g. 統計,数値処理のalgorithm for classificationやrecommendation systemといったmachine learningのapplication
- programming modelの改善, processing effectivityの改善
まとめ
- batch processing について
- distributed batch processing frameworkで解決する問題
- partitioning
- 関連するdataをすべて同じ場所にまとめる
- fault tolerance
- 中間stateの扱い.影響
- partitioning
- MapReduceでのjoinのalgorithmは,MPP DBやdata flow engineの内部でも使われている.
- sortmerge join
- broadcast hash join
- partitionかhash join
- distributed batch processing engine(framework): 制限あるprogramming model
- → 副作用minimum
- → distributed systemのproblemを隠蔽〇
- → retry安全
- frameworkにより,fault-tolerance〇
- 完治不要.強力なtrustnessのsemantics
- especially important: inputdataを変更しない
- input dataは有限.既知のサイズ. → 官僚のjudge可能
- stream processing
- 非限定stream
- stream processing
- input dataは有限.既知のサイズ. → 官僚のjudge可能
Ch11 Stream processing
目的
- data managementの仕組みとして,event streamを見る
- incrementalにprocessされるdata
11.1 event streamの転送
目的
- event: recordを指す
- 自己完結したimmutableなobject
- producer(a.c.a. publisher)が1回生成し,複数consumer(subscriber)が扱う
- file名ではなく,topic, streamとしてgroup化
- fileかdbがあれば,producerやconsumerとconnect可能
- pollingではなくpublishが〇
- これまでのdbにない機能のためのtoolがdeveloped
- triggerなどは限定的
- これまでのdbにない機能のためのtoolがdeveloped
対応
- !1 messaging system
- eventの通知に使う
- UnixのpipeやTCP connectionにより拡張
- → ・複数producerが同じtopicにmessage 送信
- ・複数consumerが同じtopicにmessage 受信 → publish/subscribe model
- 送信 > 受信のprocessing @speedのcase
- messaging systemがdrop/queueでのbuffering/ back pressure(flow control)
- node crashやofflineのcase
- messaging lost okか,applicationに依存
- batch processing systemでは,強いtrust〇
- → streamingでも提供する
- !!1 producerからconsumerへのdirect messaging
- latencyの低さがimportantな金融 → UDP + applicationでのrecovery
- ZeroMQやnano messageなど,brokerなしのmessaging library: TCP/IP multicast上でproducer/consumer messagingを実装
- StatsDやBrubeck: UDP messaging
- consumerがnetwork上にservice公開 → producerはHTTPやRPC requestを発行して,messageをpush
- webhooksのbackground
- これらのdirect messaging systemは,message lostの可能性を, applicationのcodeが関知する必要ある
- fault-toleranceは限定的
- !!2 message broker(a.c.a. message queue)
- a type of DB
- Server, producerやconsumerがClient
- → clientの出入りに耐えられる.永続性の問題も隠蔽〇
- consumerはasync by queuing
- !!3 message broker とdbのcompare
- consumerへの配信官僚 → message(data)をdelete
- working set 小さい, queue短いという前提
- → 大量messageのbufferingは全体のthroughput down
- secondary indexなどではなく,pattern matchのtopic 部分集合を取得
- dataに変化があれば,clientに通知 ←→ snapshot
- ここまでは旧来のmessage broker. JMS, AMQPといったstandard
- e.g. RabbitMQ, ActiveMQ, HornetQ, Qpid, TIBCO Enterprise, IBMMQ, Azure Services Bus, Google Cloud Pub/Sub, Message Service
- !!4 複数consumer
- 図11-1: load balancingとfanout: messagingの2つのpattern
- load balancing
- どれか1つのconsumerに配信
- a.c.a. shared subscription
- どれか1つのconsumerに配信
- fanout
- 全consumerにdeliver.独立した複数consumer
- 2 patternの組み合わせも〇
- !!5 承認と再deliver
- 承認(acknowledgement) → messageのdelete @ broker
- 図11-2: order変化 @ load balancing
- → consumerごとにqueuingが必要なpattern
- !2 partition化されたlog
- dbでの複数のあるstorageというapproachと,messagingでの低latencyのnotify機能のcombine
- → log base message broker
- !!1 message storageへのlogの利用
- !!2 従来のmessagingとlogの比較
- log baseのapproach → fanout型のmessaging support easy〇
- load balancingは欠点アリ
- → message processing負担大きいことあり, message単位でparallel processing, messageのorder not importantなら
- → JMS/AMQP styleのmessage brokerが望ましい
- ↑→ message processingのthroughput高い必要がある,1つのmessageは高速, orderがimportantなら,log base applicationが◎
- → JMS/AMQP styleのmessage brokerが望ましい
- log baseのapproach → fanout型のmessaging support easy〇
- !!3 consumerのoffset
- brokerはacknowledgementの追跡不要: 管理のoverhead少ない
- → batchやpipelineといった手法を取るchanceが生まれる → throughput高い @ log base system
- 5.1.2のlog sequence numberと似ている
- brokerはacknowledgementの追跡不要: 管理のoverhead少ない
- !!4 disc領域の利用
- 循環buffer, ring bufferingを使う
- throughputはhistoryの量による
- !!5 producerにconsumerが追い付かないcase
- consumerのdelayが大きくなったら警告
- 各consumerは独立
- !!6 古いmessageのreplay
- AMQPやJMS styleのmessage brokerは破壊的
- ↑→ log baseのmessage brokerはread only
- → 繰り返し可能
- → maintenance〇. 組織内のdata flowとの組み合わせのためのtoolとして log baseのmessagingは〇
- → 繰り返し可能
11.2 dbとstream
目的
- DBとstreamのつながりはdisc上のlogのphysical storageをこえて,本質的なもの
- e.g. replication log
- e.g. state machine replication(9.3.3)
- heterogeneousなdata systemのproblemを,streamの着想でresolveする
- e.g. replication log
対応
- !1 systemのsyncの保持
- 複雑なapplicationの要求は,異なる技術の組み合わせで解決
- それぞれの目的にoptimiseした形式で各々がcopyをもつ
- → syncが必要
- e.g. ETL process @DWH(3.2.1): batch processing
- → syncが必要
- dual writesの重大なproblem ← dual writesは,dumpが低速なcaseの代替案
- 図11-4: race condition
- version vector(5.4.4)などが必要
- fault-toleranceのproblem
- 2 system間不整合
- 図11-4: race condition
- それぞれの目的にoptimiseした形式で各々がcopyをもつ
- 複雑なapplicationの要求は,異なる技術の組み合わせで解決
- !2 変更dataのcapture(Change Data Capture, CDC)
- 図11-5: flow. 変更のstreamをproducerとconsumerで扱う
- producerは記録するsystemのdb, consumerはsearch indexやDWH
- !!1 CDCのimplement
- logのconsumerは導出data system
- ↑ CDCはconsumerが正確なcopyとなることを保証
- dbのtriggerが使える
- ただし,triggerは壊れやすく,performanceでの大きなoverhead
- implementのe.g. → p.498
- CDCはasync
- !!2 初期のsnapshot
- dbのsnapshotと変更ログの対応必要
- !!3 logのcompaction
- snapshotの代替
- log baseのmessage brokerやCDCでも〇
- e.g. Apache Kafka
- message brokerを一過的messagingのみならず,永続性あるstorageとしても使える
- !!4 change streamのためのAPI support
- change stream: first classのinterfaceとしてDBでsupport
- e.g. RethinkDB, Firebase, CouchDB, Meteor, VoltDB, Kafka Connect
- change stream: first classのinterfaceとしてDBでsupport
- 図11-5: flow. 変更のstreamをproducerとconsumerで扱う
- !3 event sourcing
- event sourcingとCDCの違い
- (event streamingはDDDのcommunityでdeveloped)
- event sourcingでは,event storeには追記のみ
- ↑→ CDCはapplicationのdbをmutableに使う
- eventは低レベルの変化でなく,application levelで起きたことを反映
- event sourcingは, applicationの進化に〇. bugからguard
- event sourcingはchronicle data modelとsimilar. star schemaのevent logとfact tableにも掃除
- event sourcingのstreaming systemとの関連を見る
- !!1 event logからのnow のstateの導出
- 決定的なlogでlog → state
- logのstoreのproblem
- 決定的なlogでlog → state
- !!2 commandとevent
- commandがvalidated → event
- event: fact. logのimmutableな一部
- → commandのvalidationはsync 必要
- serializableなtransactionの利用
- or, eventの分割
- → commandのvalidationはsync 必要
- event: fact. logのimmutableな一部
- commandがvalidated → event
- event sourcingとCDCの違い
- !4 state, stream, immutability
- immutabilityの原則: event sourcingやCDCの強さ
- dbはapplicationの現在のstateをstore
- readにoptimise
- stateの変化とimmutabilityとの折り合い
- stateの変化: eventのresult
- mutableなstateとimmutableなeventからなる追記飲みのchange log
- log が矛盾しないことがimportant
- → change log: stateの進化の表現
- 図11-6: applicationのstateとevent streamのrelation
- log: fact
- db: logのcache
- !!1 immutableなeventのmerit
- 監査制,problemのrecovery,分析
- !!2 event log からの複数viewの導出
- mutableなstateをimmutableなevent log からisolation
- event logからdbへのexplicitなconvert step
- → 時間とともにapplicationの進化〇.新旧共存
- dataのwrite formatとread formatのisolation → 大きな柔軟性
- ⇔ CQRS(Command Query Responsibility Segregation
- → 正規化,非正規化はほぼ無意味
- ↑ convert processがreadにoptimiseされたviewとevent logと整合性を保てば,viewでdataを非正規化〇
- !!3 concurrency control
- event sourcingとCDCのasyncの欠点
- → 5.2.1(p.173)で解決
- or → stateをevent logから生成
- event sourcingでは,user actionの自己完結した記述: event
- 同じpartitionなら,single threadで処理〇
- !!4 immutabilityのbound
- Datomicのexcision(切除)や,Fossilのshunning(回避)が必要なcaseあり.
11.3 streamのprocessing
目的
- streamをprocessして,new導出streamをgenerate
- operator, job
- streamは終わりなし
- → sort merge join×.fault toleranceの仕組み別に必要. ← crash時に初めからは×
対応
- !1 stream processingの利用.
- stream processing は,monitorのために使われてきた
- alertのnotify
- 洗練されたpattern matchingとの関連付け必要
- ほかの使い方の出現
- !!1 複合event processing(Complex Event Processing, CEP)
- !!2 stream analyze
- !!3 materialized viewの管理
- applicationのstateもmaterialized viewの1つ
- ただし,全event必要.log compactionも必要
- applicationのstateもmaterialized viewの1つ
- !!4 streamでのsearch
- streamのsearch: queryをstore → CEPのようにdocumentがqueryを通る
- queryのindexづけ
- !!5 message passingとRPC
- RPC的systemとstream processing間は重なる領域もあるが,そもそもは異なる
- stream processing は,monitorのために使われてきた
- !2 timeに対する考察
- localのsystem clockでwindowを決定
- simple
- but, eventのgenerationとprocessingの間が小さいときに限る
- !!1 event timeとprocessing time
- 図11-7: processing timeでwondow processしたときのproblem
- !!2 preparedのcheck
- はぐれ(straggler) event のproblem
- 虫 or correction
- はぐれ(straggler) event のproblem
- !!3 結局,どのclock?
- 3つのtimestampをlogging
- eventが実際に起きたtimeの推定必要
- !!4 windowのtype
- Tumbling window
- 固定長,全eventがいずれか1つのwindowにのみbelong
- Hopping window
- smoothingのためにwindow同士が重なる
- sliding window
- ある期間が動く.長さのみ決まっている
- session window
- 期間非固定
- 時間的近さ
- web siteのanalyzeに使う
- Tumbling window
- localのsystem clockでwindowを決定
- !3 stream のjoin
- stream processing: data pipelineを限度ないdatasetのincrementalなprocessingに一般化したもの
- → joinが同じく必要
- !!1 stream-stream join(window join)
- stream processorがstateを管理する必要
- !!2 stream-table join(streamのenrich)
- stream processorがもつdbのlocal copyの最新化が必要
- → CDCでresolve〇
- stream-stream joinとの違い: stream-table joinでは,tableのchange log streamに対して,新versionのrecordで旧recordをoverwrite. with 「時間の始まり」までのwindow
- stream processorがもつdbのlocal copyの最新化が必要
- !!3 table-table join(materialized viewのmaintenance)
- cacheの使用が例
- !!4 joinのtimeに対する依存
- いずれのjoinもstream processorがjoinへのinputの1つに基づくstateを管理
- → stateを管理するeventのorderがimportant
- Slownly Changing Dimension(SCD)のproblem
- → joinするrecordのversionごとにidを使う
- joinはdecisive, but logのcompaction不可能
- → joinするrecordのversionごとにidを使う
- stream processing: data pipelineを限度ないdatasetのincrementalなprocessingに一般化したもの
- !4 fault-tolerance
- exactly-once semantics(事実上effectively-onceがより正しい) @batch job
- streamはより複雑な対処必要
- !!1 micro batch processingとcheckponit processing
- !!2 atomicなcommit再び
- !!3 冪等性(idempotence)
- e.g. Kafka, StormのTrident
- いくつかの前提必要
- わずかなoverheadでexactly-once semanticsをrealized〇
- !!4 fault後のstateのrebuild
- stateをstream processorのlocalにstore, and regularly replication
- e.g. FlumeJava, Samza, Kafka Streams, VoltDB
- infraのperformance特性に依存
- exactly-once semantics(事実上effectively-onceがより正しい) @batch job
まとめ
- event streamについての目的とprocessing方法
- unboarder streamをprocessing ←→ batch processing
- message brokerとevent logの使用
- AMQP/JMS styleのmessage broker
- log baseのmessage broker
- Streamとしての表現〇のe.g.
- change log, CDC, event sourcing, log compaction
- dbをstreamとして表現
- index, cache, analysis systemのような導出data system
- streamとしてstateをmanagement, messageをreplayする機能
- stream のjoinやvarious stream processingのframeworkのfault-tolerance実現のための手法の基礎
- stream processingの目的
- event pathのsearch(CEP), Window集計の演算(Stream analysis), materialized view
- stream processorのtimeの扱い
- stream processingでのjoin
- stream-stream join
- stream-table join
- table-table join
- stream processorでのfault-toleranceとexactly-once semanticsの実現
Ch12 data systemの将来
目的
- Ch11まで: 現在の姿.Ch12: あるべき姿
- 本書の目標: trusted, scalable, maintainableなapplicationやsystemの構築方法を見る
- Ch12の目標: 頑健性,正確性,進化性,人類への貢献において,よりよいapplicationを設計する方法の発見
12.1 data のintegration
目的
- software productsと,適した環境の対応付けを見出す
- 複数softwareのcombine
対応
- !1 dataの導出による特化したtooldのcombination
- data joinの必要性は,組織全体のdata flowを考える必要あり
- !!1 data flowについての考慮
- 入出力の明示
- 全てのwriteのorderを決定する単一system
- 全order決定の原理がより重要.than which CDC or event sourcing log
- 導出data systemをevent logにもとづいてupdate: 決定的で冪等〇
- faultからのrecovery容易
- !!2 導出dataとdistributed transaction
- various data system間で一貫性を保つための古典的approach: distributed transaction
- distributed transactionと導出data systemのcost comparing
- lockかlogか
- atomic commit or 決定的retryと冪等性
- transaction: linearizabilityを提供 ←→ 導出data system: timingの保証なし
- ただし,transactionのコスト大きい
- XAのfault-toleranceとperformance特性は貧弱
- → distributed transactionの有用性は限定的
- XAのfault-toleranceとperformance特性は貧弱
- various data systemのjoinは,導出data system(log based)が〇
- !!3 全orderのboarder
- 小さなsystemのみ,全orderのevent log 〇
- 全order broadcast(consentと等価)は,orderingを単一のcodeで行う
- 複数nodesでのconsent algorithmのdesignは未解決
- !!4 因果律の把握のためのeventのordering
- simpleなanswerなし
- logical timestamp
- systemのstateをlogにwrite
- conflict resolveのalgorithm
- simpleなanswerなし
- !2 batch processingとstream processing
- data joinの目標: dataを適切なところに適切なformatでおく
- これを達成のためのtool: batch/stream processor
- extract datasetを出力
- これを達成のためのtool: batch/stream processor
- Sparkはstreamをmicrobatchに分割して,batch processing engine上でstream processing
- Apache Flinkは,batch processingをstream processing engine上でexecution
- → 違いがぼやけてきている
- !!1 extracted stateの管理
- batch processing: function型programmingと同じにおい
- stream processingは,operatorがfault-tolerantな管理されたstateを扱う
- I/Oがdefinedな決定的functionの原則
- fault-tolerant〇
- 組織内のdata flowの考慮をsimplize〇
- search index, statistics model, cacheなど,extracted data systemを考えるときに考えること
- dataset extractionのpipeline, function型application codeを使ったsystemからのstateの変更のpush, extract systemへの変更の適用
- asyncな管理 → fault-tolerance〇,trusted, scalable
- !!2 applicationの進化に伴うdataのreprocessing
- reprocessing: datasetを全く異なるmodelにrebuild〇,新しい要求への対応〇
- 比喩: dual gage
- 段階的進化〇 → systemの改善speed up
- !!3 lambda archtecture
- batch processingとstream processingのcombine
- 核の着想: input data はevent sourcingのようにimmutableなeventを常に成長し続けるdatasystemに追加することで記録すべき
- Hadoop MapReduceのようなbatch processing systemと,Stormのようなstream processing systemという2つの異なるシステムをconcurr〇
- eventからは,readにoptimiseされたviewをextract
- batch,とstreamのいいとこどり,but 現実的なproblemあり
- batch/stream processingともに,logicのmaintenance必要
- stream pipelineとbatch pipelineの各出力のマージ難しい
- batch layerが複雑化
- !!4 batch processingとstream processingの統合
- data joinの目標: dataを適切なところに適切なformatでおく
12.2 databaseをときほぐす
目的
- db Hadoop OSはいずれもinfo management system
- Unix: especially 低レベルなhardwareのabstraction
- ≠ relational db: disc上のdata structure, concurrency, crashからのrecoveryなどの複雑さを隠蔽する,高レベルのabstraction
- 両者の良い部分をcombineする: 目標
対応
- !1 data storage technologyのcombination
- dbの機能,動作
- secondary index, materialized viwe, replication log, 全文検索index
- !!1 indexのgeneration
- 既存のdataに対する新しいviewとしてindexをextract
- !!2 すべてにかんするmeta db
- 組織全体にわたるdata flow: 1つの巨大なdb
- batch stream, ETL のprocess
- federated db: readのintegration(a.c.a. polystore)
- 統一されたquery interfaceの提供
- e.g. PostgreSQLのforeign data wrapper
- unbundled db: writeの結合
- 機能の解体(unbundle)
- 組織全体にわたるdata flow: 1つの巨大なdb
- !!3 processingのunbundle
- 複数system間のwriteの動機は要考慮.technology的にdifficult
- 冪等なwriteのasync event log >>> distributed transaction
- はるかに頑健で実践的
- transaction protocolがnot standarizedで統合とても難しい
- log baseの結合: 疎結合なcomponentsにできる〇
- @system level, @human level共に
- !!4 unbundled systemとjoined system
- 解体と合成が必要なのは,すべてのrequireを満たす単一のsoftwareがないときのみ
- !!5 欠けているものは何か?
- Unixのshellのようなものがない
- update cacheの事前calculation
- differential data flowが研究中
- dbの機能,動作
- !2 data flow 中心のapplication design
- database insideout approachというdesign patternの中身
- O2やJuttleなどのdata flow language
- FRP(Elmなど)
- BloomのようなLogical Programming Language
- Unbundlingは,Jey Krepsが提唱
- !!1 extract functionとしてのapplication code
- custom codeは多くのdbで苦労を伴う
- !!2 application codeとstateのisolation
- dbはapplication developmentのrequireを満たさない
- systemの一部として永続性あるdata storageに特化した部分を持ち,ほかはapplication codeの実行に特化が〇
- web applicationは,stateをもたないserviceとしてdeploy
- → stateはdb
- → stateのないapplication logicをdb(state management)からisolation
- applicationからはsubscribeがdifficult. (e.g. observer pattern)
- dbでもpolling必要
- !!3 data flow: stateの変化とapplicationのcodeとの相互作用
- 1980's のtaple 空間 model: stateの変化の監視と藩王のprocess
- dbのunbundle
- e.g. cache, 全文検索index, machinelearning, analysis system
- extracted dataの管理は,async jobのexecutionとはdifferent
- stateの変更のorderがimportant
- fault-toleranceがimportant
- → stream processorで実現可能
- applicationのcodeはstreamのoperatorとして動く
- stream processorを,data flow周りに大規模systemをつくるために合成〇 ←→ dbでは不可能. about 任意のcode execution
- !!4 stream processorとservice
- application development style: serviceの集合に機能を分割
- → 疎結合で組織的scalability〇
- streamのoperatorを合成は,同じ特徴あり〇
- ただし,layerのcommunicationの仕組みは大きく異なる: 1方向のasync message stream
- fault-tolerance + performance 高い〇
- RPCの代わりに,stream joinが◎
- ただし,いずれにせよ時間への依存性の対応が必要
- RPCの代わりに,stream joinが◎
- application development style: serviceの集合に機能を分割
- database insideout approachというdesign patternの中身
- !3 extracted stateの監視
- write path: extracted datasetを最新のstateに保つprocess
- e.g. 図12-1 ← 先行evaluation
- 後にread pathがくる ← 遅延evaluation
- extracted dataset: write/read pathの接点
- !!1 materialized viewとcache
- 全文検索indexのe.g.
- 頻出queryの有限集合にのみ,事前calculation: cache, materialized view
- → cache, materialized, indexは,extracted datasetの位置(path間のboarder)をずらしている
- 全文検索indexのe.g.
- !!2 statefulでoffline動作できるclient
- clientがstateを持つapplicationの出現
- offline firstのapplication
- device上のstate: server上のstateのcache
- offline firstのapplication
- clientがstateを持つapplicationの出現
- !!3 clientへのstateの変化のpush
- !!4 e2eのevent stream
- stateを持つclientやUIをdevelopするためのtools
- e.g. Elm Language, FacebookのReact toold, Reduxなど
- 内部的なclientのstateをmanagement, with event stream の subscribe
- instant messagingやonline gameなどで,このような「realtime」archtectureを使っているが,ほかのapplicationにも使える
- → request/responceのinteractionではなく,data flowのpublish/subscribeに移行必要
- → 反応性の良いUIとoffline supportの向上〇
- stateを持つclientやUIをdevelopするためのtools
- !!5 readもevent
- requestのprocessingと,joinは本質的に似ている
- userのactionまでのreadのrebuild可能〇
- → 因果関係の追跡〇
- but, storageとI/O cost増
- → 因果関係の追跡〇
- !!6 複数 partitionにわたるdataのprocessing
- readのevent化は,複雑なqueryのdistributed executionの可能性になる
- e.g. Stormのdistributed RPC機能
- e.g. twitterであるURLを見た人数の計算,不正detection
- e.g. Stormのdistributed RPC機能
- MPP dbでのinner query execution graphも似た特徴あり
- queryをstreamとして扱う: 大規模applicationの実装の選択肢の1つ
- readのevent化は,複雑なqueryのdistributed executionの可能性になる
- write path: extracted datasetを最新のstateに保つprocess
12.3 正確性を求めて
目的
- trustedで正確なapplication構築
- consistencyのdefineは不明瞭
- transactionは最後の手段ではない
- applicationのcodeがdbの機能を正しく使う必要あり
- weak isolation levelやquorumなど,間違い起き易い
対応
- !1 dbのe2e論
- applicationのbugのcase
- → immutableが〇
- !!1 exactly-once
- 冪等〇
- with metadata, fencing
- 冪等〇
- !!2 duplicationの抑制
- 2PCはnot enough
- !!3 操作id
- networkのcommunicationを複数hop 経由する操作を冪等にする
- ↑requestのe2eのflowを考える必要
- not serializableなisolation levelでは,unique(→ 7.2.4, p.266)性checkに難あり
- event logとしてのrequest table → 例12-2
- networkのcommunicationを複数hop 経由する操作を冪等にする
- !!4 e2e論
- duplicationの抑制は,communication system自体の機能では不可能
- → transaction IDが,end userのclientからdbまで渡される必要
- e2eはnetworkでのdataの整合性にも必要
- encryption
- 低レベルの機能の上に成り立つ
- duplicationの抑制は,communication system自体の機能では不可能
- !!5 data systemへのe2eのapply
- 高levelのabstractionにenoughな方法まだない
- transactionもnot enough
- 高levelのabstractionにenoughな方法まだない
- applicationのbugのcase
- !2 制約の強制
- unique性などの制約を守らせる
- !!1 unique 制約にはconsent必要
- unique必要なvalueをpartitionに分ける
- async のmulti master replicationは例外
- !!2 log baseのmessagingでのuniqueness
- log: total order broadcast = consentと等価
- partition数を増でscale〇
- いずれの制約にもsequencialにprocess可能
- basic 原則: conflictable writeは同じpartitionに送られ,sequencialにproceedされる
- conflictのdefineはapplicationに依存しうるが,stream processorは任意のlogicを利用可能
- log: total order broadcast = consentと等価
- !!3 multi partitionのrequest processing
- partitioningされたlogを使い,atomicなcommitなしで同等のaccuracy〇
- 単一objectへのwriteは,ほぼすべてのdata systemでatomic
- → 複数partitionにまたがるatomic commit不要
- 「複数partitionにわたるtransactionを,partitioningの方法が異なる2stageに分割し,e2eのrequest idを用いる」ことで,faultがあっても,atomicなcommit protocolを用いずに同じaccuracy達成〇 → p.563, 12.2.3.6と似る
- !3 timeliness(適時性), integrity(整合性)
- transactionのよい性質: linearizable
- 複数stageのstream processorではasyncだが,出力streamのmessageをclientがwait 可能
- → syncの通知 ← 待機の目的
- consistency(一貫性)の2つの要求
- timeliness
- userが,systemの最新のstateの観察を保証
- CAP定理: 強制約,read-after-write: 弱制約
- userが,systemの最新のstateの観察を保証
- timeliness
- integrity
- dataに矛盾や間違いがないこと
- especially 正しいextract data
- 明示的なcheckとrepair必要
- ACID transactionでは,consistencyはある種のapplication固有のintegrityの表現
- atomicityとeternityが整合性を保持するための重要なtools
- dataに矛盾や間違いがないこと
- timelinessの違反: 結果整合性
- integrityの違反: 恒久的なnot consistency
- importance: integrity >>>> timeliness
- !!1 data flow systemのaccuracy
- ACID transactionはtimeline(linearizability)とintegrity(atomicなcommit)を共に保証
- event baseのdata flow systemは,timelinessとintegrityをisolate
- streaming systemの中核はintegrity
- exactly-onceは,integrityのための仕組み
- atomic commit不要
- integrityのための複数仕組みのcombination
- writeを単一のmessageで表す
- 決定的なextract function(SPのようなもの)
- clientがgenerateしたrequest idを全levelで渡す
- → e2eのduplication抑制・冪等性
- messageをimmutableにし,extracted dataをあとからreprocessing可能
- → recovery easy
- !!2 制約のゆるやかな解釈
- unique制約の強制: consistency必要
- より弱いuniquenessでenoughなcase
- compensating(補正) transaction
- linearizability不要なcase
- 謝罪のコストはbusiness上のjudge
- 許容, → 弱い制約
- → integrityは必要,but timelinessは不要
- !!3 調整を回避するsystem(coordination-avoidance system)
- 2つの所見
- data flow system: extracted data でのintegrityの管理をatomicなcommit, linearizability, partition間syncなしに達成〇
- 弱制約でも,integrityが〇ならok
- → coordination不要
- → 高performance, fault-tolerance〇
- e.g. multi leader構成で複数datacenterにまたがり,region間でのsync replication可能
- → 高performance, fault-tolerance〇
- serializable transactionは一部の使用で有益
- 2つの所見
- !4 信頼しつつ検証も
- system modelでの仮定
- 現実における確率的problem
- e.g. rowhammer
- !!1 softwareのbugがあってもintegrityを保つ
- !!2 約束を盲信は×
- auditing(監査): dataのintegrity check必要
- 速いうちの検出が〇
- !!3 検証の文化
- systemの自己検証・自己監査必要
- 監査性のdesign必要
- ↑weak consistencyが普通になった
- !!4 監査性のためのdesign
- transactionは, transactionの意味・理由が不明
- ↑→ event based system: よりよい監査性
- userのinputは単一のimmutable event
- → stateの更新はeventからextracted
- extractは決定的で再現可能
- → stateの更新はeventからextracted
- data flowの明確化 → dataの系統(provenance)も明確化
- → integrityのcheckはるかにeasy
- event logはhashでevent storageのcheck〇
- extracted stateは,batchやstream processorのreprocessingでevent logからstate extract〇
- → integrityのcheckはるかにeasy
- userのinputは単一のimmutable event
- → data flowがdecisiveで十分にdefined
- → systemの行いの理由のためのdebugやtraceがeasy
- bug検出時の環境再現も〇
- !!5 e2e論再び
- dataのintegrity checkはregularに必要
- e2eでのcheckが〇 → 内部もimplicitにcheck〇
- systemの変更や新storage technologyのrisk減
- → applicationの進化〇
- systemの変更や新storage technologyのrisk減
- e2eでのcheckが〇 → 内部もimplicitにcheck〇
- dataのintegrity checkはregularに必要
- !!6 監査可能なdata systemのためのtools
- encryptionのtoolsで,hardware, softwareのproblemやcrackingへのtoleranceをもつようにsystemのintegrityを証明する方法あり
- e.g. Bitcoin, Ethereum, Ripple, Stellerのような暗号通貨, block chain, distributed台帳のtechnology
- ただし,Byzantine fault体制は△(or ×)
- このtechnologyはたいていMerkle treesに依存
- integrityのcheckや監査のalgorithmを使うsystemが,scalableでperformanceのpenalty少となるように研究必要
- encryptionのtoolsで,hardware, softwareのproblemやcrackingへのtoleranceをもつようにsystemのintegrityを証明する方法あり
12.4 正しいことを行う
目的
- すべてのsystemはある目的のために構築される
- → 目的をこえた影響もあり.これにも責任必要
- dataを,人間性とrespectをもって扱う
- 今日は,倫理的選択が一層含まれる
- → 目的をこえた影響もあり.これにも責任必要
対応
- !1 予測分析(predictive analytics)
- !!1 biasと差別
- algorithmのinputのbiasに気づけない
- machinelearningは,biasをmoneyロンダリング
- algorithmのinputのbiasに気づけない
- !!2 責任と説明責任
- dataに基づく意思決定の間違いを正す方法必要
- !!3 feedbackloop
- system thinkingでriskyなfeedbackloopを避ける
- !!1 biasと差別
- !2 privacyと追跡
まとめ
- applicationは目標を満たすため,複数のvarious softwareをcombine必要
- このdata integrationのproblemを,batch processingとevent streamingで解決
- index, materialized view, machinelearningのmodel, 統計summaryなどを管理〇
- asyncで疎結合〇
- → 頑健,fault-tolerant〇
- → applicationの進化〇
- ↑ dbをcomponentsにunbundle. → dataflow applicationを構築
- offlineでも動くinterface
- e2eの操作ID → 冪等
- asyncなevent processing
- 制約をasyncにcheck → 強いintegrityの保証を実装
- 多くのbusiness processにadapt
- 制約の対応(e.g. compensation)
- applicationをdataflowを中心において構成し,制約をasyncにcheck
- → ほとんどの調整回避〇
- → 地理的にdistributedなenvironmentで,faultを考えながらも,integrityを保ち,高performanceでsystemが動く
- 監査でdata integrity check
- data指向applicationの倫理的問題