第32回 中国地方DB勉強会 において、発表させてもらいました。
Change Data Capture についての紹介です。
γεςϜϦϓϨʔεͷதͰCDCΛऔΓೖΕͯΔ2023-10-07தࠃํDBษڧձຊPostgreSQLϢʔβʔձߴڮ Ұٍ
View Slide
ࣗݾհ• ߴڮ Ұٍ (@ikkitang)• ελʔϑΣεςΟόϧגࣜձࣾTech Product Manager• TypeScriptͱGo• ຊPostgreSQLϢʔβʔձதࠃࢧ෦
͓͠ͳ͕͖• CDC࠾༻ͷഎܠ• CDCͱʁ• Debeziumͷಛ
ҙࣄ߲• εϥΠυެ։ࡁΈͰ͢• ࣭ͱ͔͋Ε #ChugokuDB Λ͝׆༻͍ͩ͘͞ʂ• ͷ࣠ͱͯ͠"γεςϜϦϓϨʔε"͕ग़ͯ͘ΔͷͰ՝ײͷڞײͷҙਤΛࠐΊͯॴଐͷձࣾͷΛଟ͘͠·͕͢࠾༻త͡Όͳ͍Ͱ͢• ※ CDC࠾༻ͷഎܠ͕݁ߏϘϦϡʔϜΛ͠Ίͯ͠·͍·ͨ͠
CDC࠾༻ͷഎܠ
CDC࠾༻ͷഎܠελʔϑΣεςΟόϧגࣜձࣾ• 20097݄7ۀʢ15ظʣ• BtoB(C) ͚ͷϏδωεϞσϧΛల։• ҿ৯ళ༷ͷECࢀೖɾӡ༻Λτʔλϧࢧԉ• ECαΠτ্Ͱจɾܾࡁड• ͷडऔ• ͷ͓ಧ͚
CDC࠾༻ͷഎܠελʔϑΣεςΟόϧגࣜձࣾͷϓϩμΫτͷ՝ײ• 3छྨͷΞΫλʔ• ύʔτφʔ, ૹύʔτφʔ, ٤৯ऀ༷• 15ظͷϏδωεΛࢧ͑ͨෳࡶͳۀϑϩʔʢిʣ• ͦΕΒΛશ෦ࢧ͑Δ "தԝूݖͷΞϓϦέʔγϣϯ"• ʮγεςϜͷ߹Ͱ...͝ڠྗΛ͓ئ͍͍ͨ͠·͢ʯ
CDC࠾༻ͷഎܠγεςϜϦϓϨʔεΛΔ• γεςϜϦϑΝΫλϦϯάͰͳͯ͘ɺϦϓϨʔε.• ϦϑΝΫλϦϯά֎෦తͳৼΔ͍Λม͑ͳ͍ࣄ͕ఆٛ• ৼΔ͍ม͑ͳ͍ͱɺ՝ղܾʹͳΒͳ͍ͷͰม͍͑ͯ͘• ٕज़తʹ͖Ε͍ʹͳ͚ͬͨͩͷچγεςϜͭ͘Βͳ͍
CDC࠾༻ͷഎܠϦϞσϦϯάΛΔ• ϢʔβʔͷߦಈಈػΛੳͯ͠ײతͳυϝΠϯʢ༻ޠɾUIʣͱͯ͠࠶ఆٛ͢Δ• ϦϞσϦϯάͨ͠ϞσϧΛѻ͏৽γεςϜΛఏڙ͢Δ
CDC࠾༻ͷഎܠϞσϧ͕ίϯςΩετΛ·͙ͨ• ҰํͰɺҰਓͷΞΫλʔ͔ΒݟͨײతͳυϝΠϯɺผͷΞΫλʔ͔ΒݟΔͱײత͡Όͳ͍ࣄ͕͋Δ• ʮΞΫλʔ͕มΘΔͱؔ৺͕มΘΔʯΛແࢹͯ͠৽γεςϜͰѻ͍ͬͯ͘ͱ·ͨಉ͕͡ൃੜ͢Δ• ͏·͘ίϯςΩετΛ·͍ͨͰ͍͘Έ͕ඞཁ=> ΠϕϯτۦಈܕΞʔΩςΫνϟΛ࠾༻• ྫɿ ύʔτφʔ͕"Λొ"ͨ࣌͠ɺͦͷαϙʔτνʔϜͷ"৹ࠪͪ"ϦετʹฒͿ
CDC࠾༻ͷഎܠ·Ͱ͏গ͠Ͱ͢
CDC࠾༻ͷഎܠΠϕϯτۦಈܕΞʔΩςΫνϟ• ΞϓϦέʔγϣϯΛߏ͢Δίϯϙʔωϯτ͕Πϕϯτʹج͍ͮͯಈ࡞͢Δઃܭύλʔϯ• ಛఆͷΠϕϯτ͕ൃੜͨ࣌͠ʹదʹΞΫγϣϯΛى͜͢ࣄΛత• × Λొͯ͠ʂ• ̋ ͕ొ͞Ε·ͨ͠ʂ• εέʔϥϏϦςΟ, ॊೈੑ͕ߴ·Δ• Apache Kafka ͱ͍͏ϛυϧΣΞͷγΣΞ͕ଟ͍
CDC࠾༻ͷഎܠApache Ka)aΛ༻͍ͨΠϕϯτۦಈܕΞʔΩςΫνϟ• ύʔτφʔ༻ͷΞϓϦέʔγϣϯ• SoEʢྑ͍UXΛఏڙ͍ͨ͠!ʣ• ͚ࣾཧը໘ͷΞϓϦέʔγϣϯ• SoRʢ͍ͭ୭͕ͲΜͳࢦࣔͷجͰ࣮ߦ͔ͨ͠. ཤྺʹॏ͖Λஔ͘ʣ• ΞΫλʔΞϓϦέʔγϣϯͷੑ࣭ͷҧ͍Λka-aΛഊࢭͱͯ͠औΓೖΕΔࣄͰંΓ߹͍͚ͭͯΔ
ҰํͰ..!
ແࢹͰ͖ͳ͍چγεςϜ• ݱঢ়Ϗδωεಈ͍͍ͯͯͯɺಉ࣌ӡ༻ආ͚ΒΕͳ͍• ઌఔઃܭͨ͠ka$aͷϝοηʔδͷૹ৴ΛچγεςϜʹରͯ͠खΛೖΕΔࣄ͍͠• ͳΜͱ͔ɺچγεςϜͱ͏·͘࿈ܞΛऔΔࣄ͕ग़དྷͳ͍͔ʁ
CDCΛ࠾༻͢Δͧʂʂ
CDC ͱ
CDCͱ• Change Data Capture• σʔλϕʔεͷมߋʢInsert,Update,DeleteʣΠϕϯτΛݕ͠ผͷΞϓϦέʔγϣϯʹୡ͢ΔͨΊͷٕज़ख๏• ࠾༻ࣄྫ• ৯ϩά͞Μɿ Elas5cSearch IndexͷϦΞϧλΠϜߋ৽• Chatwork͞Μɿ CQRSϞσϧͷߋ৽(ಡΈࠐΈϞσϧͷߋ৽)• ΦϛΧϨ͞Μɿ σʔλϕʔεϦϑΝΫλϦϯά• Debezium ౕ͕ͬͯྑ͍Β͍͠
Debeziumͷಛ
Debezium ͷಛ• Apache Ka*a Connect ʹ४ڌͯ͠࡞ΒΕͨ CDCͷπʔϧηοτ• Debeziumࣗମ DBͷมߋΠϕϯτΛKa*aͷϝοηʔδͱͯ͠ൃՐ͢Δ• DB͝ͱʹConnectorΛͬͯมߋΠϕϯτͷݕΛ࣮ݱ͢Δ• MySQL, PostgreSQL, SQL Server, Oracle, MongoDB, CloudSpanner, ...
༻ྫ͜ΜͳJSONΛ࡞Δ.{"name": "connector໊","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","ུ": "ུ","database.hostname": "{mysql}","database.port": "{port}","database.user": "{user}","database.password": "{password}","database.include.list": "{schema}","table.include.list": "{schema.table1,...}","snapshot.mode": "initial"}}͜Μͳσʔλ͕ಘΒΕΔ// Insert{"before": null,"after": { "id": 1, "name": "init name" }}// Update{"before": { "id": 1, "name": "before name" },"after": { "id": 1, "name": "after name" }}// Delete{"before": { "id": 1, "name": "after name" },"after": null}
ӡ༻ʹ͓͍ͯؾΛ͚ͭΔࣄDebezium• ConnectorͰಡΈࠐΉςʔϒϧΛՃ͢Δ࣌৬ਓ͕ܳඞཁʢΫισΧίωΫλʔ໊Λ͚ͭͳ͍ʂ 1ഊʣMySQL• ίωΫλఆٛ࣌ͷσϑΥϧτͷڍಈͱͯ͠ςʔϒϧͷશϨίʔυΛಡΈऔΔʢεφοϓγϣοτʣ• ಡऔதʹಉ͡Ϩίʔυ͕ಡΊΔΑ͏ʹϩοΫऔΔʢάϩʔόϧϦʔυϩοΫ or ςʔϒϧϩοΫʣ• ΊͪΌͪ͘ΌͰ͔͍ςʔϒϧΛಡΉ͍࣌ʢ͍ 1ഊʣPostgreSQL• ಡऔઐ༻γϦΞϥΠβϒϧϨϕϧͷτϥϯβΫγϣϯΛషΔ• ϩοΫແ͍
ӡ༻ʹ͓͍ͯؾΛ͚ͭΔࣄͷվળྫ• DebeziumͷϩοΫΛؾʹ͢Δ߹ɺCDCઐ༻ͷReadReplicaΛཱͯΔ• Debeziumࣗମʹࢮ׆ࢹΛೖΕ͓ͯ͘ͷ͕ྑ͍• αʔόʔ֤λεΫͷੜ͖ࢮʹ• ʮࡴ͞Ε·ͨ͠ɻࣗಈͰىಈग़དྷͳ͍ͷͰखಈͰىͯ͜͠ʯͬͯϩάʹग़ΔͷͰͦΕͷࢹ• ↑ ։ൃɾݕূڥͳͲͷ DBΛࢭΊΔڥͰൃੜ͢Δ• Subscrip6on Filter Ͱݕͯ͠ LambdaͰ Restart API Λࣗಈ࣮ߦ͢Δ
(10ϲ݄)ӡ༻ͯ͠ΈͯಘΒΕͨݟɾײ• ΊͪΌͪ͘Ό͍• APIʹϦΫΤετ͛ͯϦϩʔυͨ͠Βө͞ΕͯΔ• RESTͰͷઃఆָ͕• IaaC• ެࣜυΩϡϝϯτͷཏੑ• ϋϚͬͯɺجຊެࣜDoc͕Ҿ༻͞ΕͯΔ• ϑϧϚωʔδυ͡Όͳ͍• MSKͰka'a-connectαϙʔτͯ͠Δ͚Ͳɺӡ༻ʹͷͤΔͷ͕ϦεΫ• ઃఆϛεͬͯΔͱͣͬͱϦτϥΠʢREST͑ͳ͍ʣ• ϩʔΧϧڥ• DBͱͷଓ͕Ұఆࢭ·Δ࣌ʹ૬ੑѱΊ
Ҏ্Ͱ͢ʂ