TechNight#71 Oracle Database 23c 新機能#1 Microservice関連新機能
Oracle Database Technology Night #71Oracle Database 23c マイクロサービス関連新機能Ryunosuke Deguchi日本オラクル株式会社クラウド・エンジニアリング統括Autonomous & Analytics ソリューション部2023/9/28
View Slide
1. Transactional Event Queues (TxEventQ)関連新機能2. Oracle Database Sagas3. Oracle SQL Access to KafkaAgendaCopyright © 2023, Oracle and/or its affiliates2
サービス間の影響を極小化しシステムの変更容易性を高めるアーキテクチャマイクロサービス・アーキテクチャとはCopyright © 2023, Oracle and/or its affiliates3保守とテストの容易性• 分割したサービス毎に組織を編成し開発・運用の自由度を高める• 更新単位を最小限にすることでテスト規模を最小化疎結合• API化や非同期化によりサービス間の結合度を低減• 変更による他の稼働中のサービスにへの影響を極小化独立してデプロイ可能• データソースやアプリケーション・モジュールをサービス毎で占有• デプロイやスケールの変更の単位サービス毎で任意に最適化APIサービス実装データストアイベント・ストア
Copyright © 2023, Oracle and/or its affiliates4Transactional Event Queues(TxEventQ)関連新機能
Oracle Databaseに統合された堅牢で多機能なメッセージ・キューイング・システム• TxEventQへのエンキュー/デキュー、TxEventQと他のメッセージングシステム間のメッセージ伝播機能を提供• 標準のデータベース機能(リカバリ、セキュリティなど)がサポートされる• データ駆動およびイベント駆動のアーキテクチャから求められる要件に対処• あらゆる機能が自動化されたマネージドなサービス• データベースと統合してパフォーマンスを最適化Oracle Advanced Queuing (AQ) /Oracle Transactional Event Queues (TxEventQ) とはメッセージを送信する側のクライアント メッセージを消費する側のクライアントProducer Consumerエンキュー デキューTxEventQ伝播伝播TxEventQ TxEventQCopyright © 2023, Oracle and/or its affiliates5
Web サーバ注文サービスAPI/BrokersモバイルIoTProducers Consumersモバイルアプリ検索ポータルダッシュボードリアルタイム分析、アラート機械学習モデル分析レポートアドホック分析Oracle Database(コンバージド/イベント・キュー)トランザクション• データの統合• マイクロサービスのサポート• オープンなインタフェース• 簡単なアプリ構築とAPI• バックアップ、セキュリティなどの管理タスクの軽減Oracle Advanced Queuing (AQ) /Oracle Transactional Event Queues (TxEventQ) とは6OCIオブジェクト・ストアOCIアーカイブ・ストアデータ + イベントCopyright © 2023, Oracle and/or its affiliates
Oracle TxEventQはOracle AQシャード・キューの21c以降の更新バージョンOracle AQおよびトランザクション・イベント・キュー(TxEventQ)の歴史Copyright © 2023, Oracle and/or its affiliates7Oracle8DatabaseOracle8iDatabaseOracle Database11gOracle9iDatabaseOracle9i DatabaseRelease2Oracle Database12c, 18c, 19cOracle Database21c, 23cTxEventQAQ クラシック・キューオブジェク/ADT;IOT;パーティション化AQ シングル・コンシューマキューAQマルチコンシューマキューOracleRealApplicationClustersAQ JMS標準サポートAQパフォーマンス最適化OracleStreams AQに名称変更TransactionalEvent QueuesKafka互換性の追加(高性能メッセージング /Events DB)Oracle AQOracle AQ単一データベース・ワークロードAQ シャード・キューAQJMS シャード・キューOracle AQに名称変更
AQ シャード・キューから進化した新しいメッセージ・キューイング・システム• 伝播機能• イベントストリームの同時実行性が向上• TxEventQ用のKafka Javaクライアント/Kafka実装の拡張• 多くの言語のサポート• リアルタイムなパフォーマンス監視• AQからTxEventQへのオンライン移行ツールAQシャード・キューからTxEventQへProducer ConsumerTxEventQTxEventQCopyright © 2023, Oracle and/or its affiliates8TxEventQ 伝播 伝播エンキュー デキュー
リモート・サブスクライバへの伝播• メッセージは1つのキューから別のキューに伝播できる• アプリケーションは同じデータベース・キューに接続されていなくても相互に通信可(宛先キューは同じデータベースでもリモート・データベースでも可能)➡ リモートでの処理のオフロード、バックアップ• 伝播により、多くの受信者にメッセージを展開可能、異なるキューのメッセージを1つのキューに結合することもできる• JMSセッション・レベルの順序付けセマンティクスを利用し、宛先のキューに送信Oracle AQ / TxEventQ のメッセージ伝播P伝播伝播C伝播異なるキューのメッセージを1つのキューに伝播PPCC1つのキューから異なる複数の別のキューに伝播Copyright © 2023, Oracle and/or its affiliates9
リモート・サブスクライバへの伝播2種類の伝播• キューからdblinkへの伝播• キューからキューへの伝播Oracle AQ / TxEventQ のメッセージ伝播Pキューからdblinkへの伝播Copyright © 2023, Oracle and/or its affiliates10dblinkサブスクライブ・キューキューからキューへの伝播Pdblink,destination_queue宛先キュー
TxEventQイベントストリームとはイベントストリーム(シャード):キュー表のパーティションで構成• 水平分割による高い同時実行性とスループットを実現する• イベントストリームは自動でパーティション化• インスタンス内にイベントストリームでエンキューセッションを分散• インスタンス内にすべてのEvent Streamsのデキュー• ローカルデキューアがない場合にメッセージをクロスインスタンス転送イベントストリームの同時実行性が向上RACインスタンス2キュー表RACインスタンス1ProducerCイベントストリーム1イベントストリーム2キュー表イベントストリーム1ProducerAProducerBConsumerConsumerConsumerCopyright © 2023, Oracle and/or its affiliates11
新機能:パーティション化されたイベントストリーム• TEQではイベントストリームがパーティション化されるように• 新しいパーティションは、必要に応じて自動的に作成される• パーティション内のすべてのメッセージがデキューされると、パーティションは切り捨てられ再利用されるイベントストリームの同時実行性が向上Copyright © 2023, Oracle and/or its affiliates12キュー表インスタンスConsumerProducerAイベントストリーム1パーティション1パーティション2パーティション3キュー表a1 a2 a3a1a2a3
新機能:RACを使用したときの同時実行性が向上• パーティション化されたイベントストリームとRACインスタンスにアフィニティを持たせる• 1つのイベントストリームの処理を1つのRACインスタンス内で完結させるようにする• 元々RACはサポートしていたが、新たにサポートされたパーティショニングと組み合わせて実現イベントストリームの同時実行性が向上Copyright © 2023, Oracle and/or its affiliates13キュー表RACインスタンス1イベントストリームProducerA Consumerパーティションキュー表RACインスタンス2イベントストリームProducerA Consumerパーティションa1a2a1a2
Apache Kafkaとは?• Apache Kafkaは、スケーラビリティに優れた分散メッセージキュー• 広く使用され、人気のあるオープンソースのイベントストリーミングおよびメッセージングシステム• 分散型のフォールトトレラントアーキテクチャで大量のデータを処理する機能を備える。KafkaとOracle Databaseの互換性Producer ConsumerKafka Brokers• Broker:Kafkaが稼働する 1 台のサーバー• Kafka Cluster:Kafkaが実行されているサーバ(Broker)をグループ化したもの• Zookeeper:Kafkaを管理するサーバCopyright © 2023, Oracle and/or its affiliates14
TxEventQがKafkaのインターフェースOKafkaを持つように• Kafka APIアプリケーションとOracle APIアプリケーションがメッセージの互換性を持つ• Kafka Java APIはOracle Databaseサーバーに接続し、メッセージング・プラットフォームとしてTxEventQを使用可能• KafkaをTxEventQに置き換えての使用が可能• KafkaとTxEventQが相互にメッセージのやりとりできるように• Kafka Java APIを使用することでKafkaからTxEventQへ、TxEventQからKafkaへ相互にメッセージを送れるようにKafkaとOracle Databaseの互換性KafkaのAPIを使用してTxEventQへのエンキュー・デキューが可能にKafkaとTxEventQのメッセージのやり取りが透過的にProducer ConsumerProducer ConsumerProducer ConsumerTEQTEQTEQTEQProducer ConsumerAPICopyright © 2023, Oracle and/or its affiliates15
アーキテクチャKafkaとOracle Databaseの互換性プロデューサ プロデューサ プロデューサコンシューマ コンシューマ コンシューマプロデューサ プロデューサ プロデューサコンシューマ コンシューマ コンシューマKafkaブローカprops.put("bootstrap.servers", ":9092") props.put("bootstrap.servers", ":1521")TxEventQZookeeperトピック1KafkaトピックおよびパーティションはTxEventQキュー表およびイベント・ストリームにマップされるプロデューサ1プロデューサ2パーティション1コンシューマ1コンシューマ2コンシューマ3相互運用または置換可能パーティション30 10 1パーティション20 1 2Copyright © 2023, Oracle and/or its affiliates16
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import java.util.Properties;public class SimpleProducerOKafka{public static void main(String[] args) {try {Properties props = new Properties();props.put("bootstrap.servers", “kafka:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer producer = new KafkaProducer(props);Future lastFuture = null; int msgCnt = 20000;for(int i=0;iProducerRecord producerRecord = new ProducerRecord("TxEventQ", i+"", "Test message # "+i);lastFuture = producer.send(producerRecord);}System.out.println("Produced "+ msgCnt +" messages."); lastFuture.get(); producer.close();}catch(Exception e) {System.out.println("Exception in Main " + e );e.printStackTrace();}}}Kafka Javaクライアント-プロデューサ既存のKafkaクライアント・アプリケーションをTxEventQと連動させる方法:この行を確認します。こちらCopyright © 2023, Oracle and/or its affiliates17
import org.oracle.okafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import java.util.Properties;public class SimpleProducerOKafka{public static void main(String[] args) {try {Properties props = new Properties();props.put("bootstrap.servers", "database:1521");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer producer = new KafkaProducer(props);Future lastFuture = null; int msgCnt = 20000;for(int i=0;iProducerRecord producerRecord = new ProducerRecord("TxEventQ", i+"", "Test message # " +i);lastFuture = producer.send(producerRecord);}System.out.println("Produced "+ msgCnt +" messages."); lastFuture.get(); producer.close();}catch(Exception e) {System.out.println("Exception in Main " + e );e.printStackTrace();}}}TxEventQのKafka Javaクライアント-プロデューサこのインポートをTxEventQ Kafkaプロデューサに変更Oracle Databaseをブートストラップ・サーバー・リストに追加コードの残りの部分の変更は必要なし!Copyright © 2023, Oracle and/or its affiliates18
•複数のコンシューマおよび遅延送信を含むJMS P2Pおよびpub/sub•Spring JMSサポート用のSpring Bootスタータ•Kafkaブローカをメッセージ・ブローカとしてTxEventQに置き換えるためのKafka Javaクライアントのサポート(Okafka)JavaJavaScript(Node.js)PythonC/C++ODP.NETPL/SQLRESTおよびSQLcl// get a connection to the databaseoracledb.initOracleClient({});connection = awaitoracledb.getConnection({user: 'pdbadmin',password: '********',connectString: 'db:1521/pdb1'})// enqueue a messageconst rawQueue = awaitconnection.getQueue("my_TxEventQ");await rawQueue.enqOne(“Hi Mom!");await connection.commit();import oracledbfrom os import environ as envoracledb.init_oracle_client()connection =oracledb.connect(dsn='172.17.0.2:1521/pdb1',user='mark',password='******')queue = connection.queue("myq", "JSON")queue.enqOne(connection.msgproperties(payload={"developer day": "today"}))connection.commit()PythonNode.js多くの言語のサポートCopyright © 2023, Oracle and/or its affiliates19
TxEventQでモニタリング可能な要素• TxEventQのパフォーマンス監視でモニタリングできる内容は次の通り:メッセージング・システムの正常性の確認エンキュー/デキューのスループット、キューの深さを含む、全体的な主要パフォーマンス指標の監視メッセージング・アクティビティに由来するCPUの負荷、メモリーの使用状況、およびデータベース待機クラスの監視各キューの正常性状態を確認して、パフォーマンスの低いキューを素早く特定• TxEventQのパフォーマンス監視は、データベースの動的パフォーマンス・ビューから情報を取得• キューの主要メトリックに関するレポートの出力は各種ユーザー・インタフェースと統合可能• オープン・ソース・ツールPrometheusおよびGrafanaを使用してメトリックの監視が可能リアルタイムなパフォーマンス監視Copyright © 2023, Oracle and/or its affiliates20
TxEventQモニタ・システムの構成リアルタイムなパフォーマンス監視• TxEventQのメトリックを視覚的に取得可能Oracle DatabaseTxEventQPrometheus GrafanaOracle Databaseから監視に必要な情報を収集するためのエージェントエクスポータを使用して収集したメトリックを保管するサーバーPrometheusの情報を使用してデータを視覚化するプラットフォームPrometheus のエクスポータ(oracledb_exporter)dockerコンテナ上で動作TxEventQのメトリックを取得するデータソースを呼ぶGrafanaで視覚化されたメトリック次のようなメトリック情報を取得: ステータス キューの数 サブスクライバの数 エンキュー/デキューのスループット メッセージの数Copyright © 2023, Oracle and/or its affiliates21
Grafanaの使用によりサマリーを視覚的に確認することが可能にTxEventQモニターではインスタンス・キュー・サブスクライバ・ディスク・グループを対象に次のサマリーを確認することが可能:• すべてのTxEventQ全体のサマリー• データベース・メトリックのサマリー• システム・メトリックのサマリー• TxEventQごとのサブスクライバのサマリーリアルタイムなパフォーマンス監視▲ システム・サマリー・ダッシュボード:システム・レベルのメトリックとキュー・レベルのメトリックを表示(CPU使用率と使用メモリーなど)▲ TxEventQサマリー・ダッシュボード:全体的に集計されたTxEventQの統計情報(ステータス、キューの数、サブスクライバの数、エンキュー率/デキュー率、メッセージの数など)▶ データベース・サマリー・ダッシュボード:全体的なDBのパフォーマンスと統計情報Copyright © 2023, Oracle and/or its affiliates22
• 移行ステップを自動化するPL/SQLパッケージDBMS_AQMIGTOOLの提供• 機能• AQの定義とデータをチェックし、移行の許可/不許可、適応可能かどうかのレポートと推奨事項の表示• 要件に応じた移行モードが選択可能:AUTOMATIC/INTERACTIVE/OFFLINE/ONLY_DEFINITION• 移行のコミット/フォールバックが選択可能• キューの移行履歴の確認AQ から TxEventQ へのオンライン移行ツールCopyright © 2023, Oracle and/or its affiliates23TxEventQとの互換性チェックStartCHECK_MIGRATION_TO_TXEVENTQINT_MIGRATION移行実行EndABORT_MIGRATIONCOMMIT_MIGRATION非サポート機能なしリスト生成→非サポート機能リスト成功
AQ シャード・キューから進化した新しいメッセージ・キューイング・システム• 伝播機能• イベントストリームの同時実行性が向上• TxEventQ用のKafka Javaクライアント/Kafka実装の拡張• 多くの言語のサポート• リアルタイムなパフォーマンス監視• AQからTxEventQへのオンライン移行ツールAQシャード・キューからTxEventQへProducer ConsumerTxEventQTxEventQCopyright © 2023, Oracle and/or its affiliates24TxEventQ 伝播 伝播エンキュー デキュー
Oracle Database SagasCopyright © 2023, Oracle and/or its affiliates25データ整合性を備えた分散トランザクション処理の実装
1. Sagaパターンとは2. Oracle DatabaseのSaga実装AgendaCopyright © 2023, Oracle and/or its affiliates26
SagaパターンとはCopyright © 2023, Oracle and/or its affiliates27サービス間をまたがるトランザクションの結果整合性の担保
結果整合性による一貫性相手のサービスのデータソースに直接アクセスできない• トランザクションによる一貫性が利用できない• 自サービスと相手のサービスそれぞれの「結果整合性」により全体の一貫性を保つマイクロサービスにおける一貫性の考え方Copyright © 2023, Oracle and/or its affiliates28在庫管理サービス注文管理サービス他のサービスのデータソースに直接アクセスできない
サービス間をまたがるトランザクションの仕組みSagaパターン補償トランザクションによる(事後)結果整合性• 処理の成功を前提とした楽観的な呼出し手法• 一部の処理が失敗した場合に、既に完了した処理を取り消す「補償トランザクション」により整合性を取るTCC (Try/Confirm/Cancel) パターン予約ベースの(事前)結果整合性• 処理完了の可否を事前に確認した上で処理を実施• Tryフェーズにより不整合の生じる処理を行わないことで整合性を担保マイクロサービスで検討される代表的な一貫性の仕組みCopyright © 2023, Oracle and/or its affiliates29InsertDeleteInsertReserveUpdateReserveUpdate
サービス間をまたがるトランザクションの仕組み補償トランザクションによる(事後)結果整合性• 処理の成功を前提とした楽観的な呼出し手法• 一部の処理が失敗した場合に、すでに完了した処理を取り消す「補償トランザクション」により整合性を取る向いているトランザクション制御の特性• トランザクションの存続期間が長く、1 つのマイクロサービスが長時間実行されても、他のマイクロサービスがブロックされることは避けたい• ワークフロー内の操作が失敗した場合にロールバックできる必要がある• 他のトランザクションと分離されやすく、補償処理に人手が関与しやすいユースケース• ユーザー単位で行われる受付業務• カート→チェックアウト→配送のような長期的なトランザクションSagaパターンとはCopyright © 2023, Oracle and/or its affiliates30InsertDeleteInsert
Oracle DatabaseのSaga実装Copyright © 2023, Oracle and/or its affiliates31データベース機能を利用したOSaga
組み込みコーディネータによるシンプルで堅牢性の高いアーキテクチャOracle Databaseを使用したSagaの実装Copyright © 2023, Oracle and/or its affiliates32TravelAgencyHotelServiceAirlineServiceメッセージ・ブローカー旅行代理店サービスSagaイニシエータ/コーディネータホテル予約サービスSaga参加者航空券予約サービスSaga参加者TravelAgencyPDB BrokerPDBHotelPDBAirlinePDBDBLINKを使用したDB間のメッセージ伝播AQ/TxEventQによる複数の参加者間の非同期通信SagaIDSagaIDJSONサポートSagaID補償トランザクションの自動実行
組み込みコーディネータによるシンプルで堅牢性の高いアーキテクチャOracle Databaseを使用したSagaの実装(正常パターン)Copyright © 2023, Oracle and/or its affiliates33TravelAgencyHotelServiceAirlineServiceメッセージ・ブローカー旅行代理店サービスSagaイニシエータ/コーディネータホテル予約サービスSaga参加者航空券予約サービスSaga参加者TravelAgencyPDBBrokerPDBHotelPDBAirlinePDBroom表flight表1. ホテル予約リクエスト2. ホテル予約処理3. ホテル予約OK4. 航空券予約リクエスト5. 航空券予約処理6. 航空券予約OK7. 予約確定
組み込みコーディネータによるシンプルで堅牢性の高いアーキテクチャOracle Databaseを使用したSagaの実装(異常パターン)Copyright © 2023, Oracle and/or its affiliates34TravelAgencyHotelServiceAirlineServiceメッセージ・ブローカー旅行代理店サービスSagaイニシエータ/コーディネータホテル予約サービスSaga参加者航空券予約サービスSaga参加者TravelAgencyPDBBrokerPDBHotelPDBAirlinePDBroom表flight表1. ホテル予約リクエスト2. ホテル予約処理3. ホテル予約OK4. 航空券予約リクエスト
組み込みコーディネータによるシンプルで堅牢性の高いアーキテクチャOracle Databaseを使用したSagaの実装(異常パターン)Copyright © 2023, Oracle and/or its affiliates35TravelAgencyHotelServiceAirlineServiceメッセージ・ブローカー旅行代理店サービスSagaイニシエータ/コーディネータホテル予約サービスSaga参加者航空券予約サービスSaga参加者TravelAgencyPDBBrokerPDBHotelPDBAirlinePDBroom表flight表1. ホテル予約リクエスト2. ホテル予約処理3. ホテル予約OK4. 航空券予約リクエスト5. 航空券予約処理失敗6. 航空券予約NG
組み込みコーディネータによるシンプルで堅牢性の高いアーキテクチャOracle Databaseを使用したSagaの実装(異常パターン)Copyright © 2023, Oracle and/or its affiliates36TravelAgencyHotelServiceAirlineServiceメッセージ・ブローカー旅行代理店サービスSagaイニシエータ/コーディネータホテル予約サービスSaga参加者航空券予約サービスSaga参加者TravelAgencyPDBBrokerPDBHotelPDBAirlinePDBroom表flight表5. 航空券予約処理失敗6. 航空券予約NG7. ロールバック指示8. ロールバック(自動補正)
優れたデータ一貫性の提供• Saga実装はデータベースに統合• Sagaのコード化、デプロイ、メンテンナンスが容易• Saga中断時のロールバック処理を行う組み込み済の自動補正ロジック(ロック・フリー列値の予約機能)• Oracle Databaseのスケーラビリティ、高可用性、一貫性、堅牢性の活用• Oracle Multitenantによる一元的なDB管理• ディクショナリ表やビューですべてのSaga参加者の一元的な状態管理が可能• アプリ開発者のハードル低減• 障害時のリカバリ・ロジックの記述が不要• 組み込み済のAQおよびTxEventQにより、メッセージおよびイベントの生成が可能-Kafkaコーディネータ不要• より高レベルのAPIおよびSpring Bootとの統合• Javaアノテーションの使用-既存コードの流用が可能• コンバージド・データベースの活用• アプリケーション・ペイロードのJSONサポートOracle Databaseを使用したSagaの実装Copyright © 2023, Oracle and/or its affiliates37
23c Oracle Saga Frameworkを試してみた その1:設定編23c Oracle Saga Frameworkを試してみた その2:実行編参考:Oracle Databaseを使用したSagaの実装サンプル(PL/SQL)Copyright © 2023, Oracle and/or its affiliates38
Oracle SQL Access to KafkaCopyright © 2023, Oracle and/or its affiliates39
Oracle SQL APIを使用してKafkaトピックに動的にクエリを実行できる• Oracle Database 23cからのネイティブ機能• SQL構文でKafkaストリーミングデータを処理• Oracle DatabaseにKafkaアクセスが統合されているため、外部クライアントが不要• DBMS_KAFKA および DBMS_KAFKA_ADMパッケージで構成されるCopyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)とはProducer0 1 20 10TopicPartition0Partition1Partition240
KafkaデータとOracle Database内のデータを組み合わせたデータ分析• データベースに保存せずに一時的な利用が可能データ処理はOracle Databaseのトランザクションとして実行・制御• トランザクションはデータベースのACID(原子性・一貫性・独立性・永続性)要件に準拠• トランザクションでオフセットを管理することで、システム障害時のKafkaレコードの消失やアプリケーションによる再処理を防ぐ(Kafkaデータの分離と耐久性の向上)アプリ開発者でなくともストリーミング・データの活用が可能• データ処理はSQL, PL/SQLで記述Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)のメリットOracle SQL APIを使用してKafkaトピックに動的にクエリを実行41
3つのデータ・アクセス・モード• ロード:Kafkaトピックのデータをデータベースのテーブルにロードし、様々なアプリからアクセス可能にする。主にDWH用途。• ストリーミング:Kafkaレコードを順番に1度だけSQLやPL/SQLを使って処理する。• シーカブル:指定した開始と終了のタイムスタンプ間のKafkaレコードにアクセスする。過去の時点のデータをまとめて取得する用途。OSaKがサポートしているデータ形式• 区切りテキストデータ(csvなど)• JSON• AvroCopyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK) の機能Oracle SQL APIを使用してKafkaトピックに動的にクエリを実行42
3つのデータ・アクセス・モード• ロード:Kafkaトピックのデータをデータベースのテーブルにロードし、様々なアプリからアクセス可能にする。主にDWH用途。Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK) の機能Oracle SQL APIを使用してKafkaトピックに動的にクエリを実行430 1 20 10TopicPartition0Partition1Partition2リアルタイム分析機械学習モデル分析レポート
3つのデータ・アクセス・モード• ストリーミング:Kafkaレコードを順番に1度だけSQLやPL/SQLを使って処理する。Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK) の機能Oracle SQL APIを使用してKafkaトピックに動的にクエリを実行440 10TopicPartition0Partition1Partition20 1 2ユーザー表0 User11 User2
3つのデータ・アクセス・モード• シーカブル:指定した開始と終了のタイムスタンプ間のKafkaレコードにアクセスする。過去の時点のデータをまとめて取得する用途。Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK) の機能Oracle SQL APIを使用してKafkaトピックに動的にクエリを実行45TopicPartition0Partition1Partition20 1 2 3 4 5 6 7 8 ・・・0 1 2 3 ・・・0 1 ・・・T1~T6T1 2T2 3T3 4・・・
1. 各Kafkaクラスタのクラスタ・アクセス・ディレクトリの作成• ディレクトリのREAD権限を与えることで、どのOracle DatabaseユーザーがKafkaクラスタにアクセスできるか制御するのに使用2. ORACLE_BASEのディレクトリにクラスタ・アクセス設定ファイルを格納するOSディレクトリの作成3. SYSDBAでデータベースにログインし、ディレクトリ・オブジェクトを作成Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)Kafkaクラスタの登録手順SQL> create directory osakaccess_kafkaclus1 AS ‘’;mkdir –p /u01/app/oracle/osak/kafkaclus1/config;SQL> create directory OSAK_KAFKACLUS1_CONFIG AS ‘/u01/app/oracle/osak/kafkaclus1/config’;46
4. Kafkaクラスタにアクセスするために必要な情報を含む設定ファイルosakafka.propertiesの作成• OSaK管理者によって作成され、管理される• キーと値の組(key=value)のテキストファイル• $ORACLE_BASE/osak/cluster-name/configに格納される以下はOCI Streaming Service(OSS) Kafkaクラスタにアクセスするためのosakafka.properties設定例5. クレデンシャルの作成(SSL.key.locationまたはsasl.usernameプロパティを使用している場合)Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)Kafkaクラスタの登録手順security.protocol=SASL_SSL #Kafkaブローカーとの通信に使用されるセキュリティ・プロトコルsasl.mechanism=PLAIN #認証に使用するSASLメカニズムsasl.username=// #Kafkaクラスタとの認証に必要なユーザー名max.partition.fetch.bytes=1048576 #1パーティションにつき1MBまでのリクエストサイズ制限(推奨)SQL> BEGINdbms_credential.create_credential(credential_name => ‘KAFKACLUS1CRED1’,username => ‘KAFKACLUS1’,password => ‘ssl-key-password or sasl-password’)END;/47
6. SYSDBAとしてログインし、DBMS_KAFKA_ADM.REGISTER_CLUSTERプロシージャでKafkaクラスタを登録Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)Kafkaクラスタの登録手順SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER(‘KAFKACLUS1’, ‘cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092’, ‘OSS’, ‘OSAKACCESS_KAFKACLUS1’, ‘KAFKACLUS1CRED1’,’OSAK_KAFKACLUS1_CONFIG’, ‘Mytest cluster kafkaclus1’) from dual;--登録成功の場合0が出力されるDBMS_KAFKA_ADM.REGISTER_CLUSTER('KAFKACLUS1','CELL-1.STREAMING.AP-TOKYO-1.OCI.OR--------------------------------------------------------------------------------048
1. ロードする表の作成2. ロード・アプリケーションの作成Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)例:KafkaデータをOracle DatabaseのテーブルにロードSQL> create table address(name varchar2(30),address varchar2(100),city varchar2(30),state varchar2(2),zipcode integer);SQL> DECLAREv_options varchar2(50);BEGINv_options := ‘{“fmt”:”DSV”, “reftable”:”address”}’;DBMS_KAFKA.CREATE_LOAD_APP(‘KAFKACLUS1’, ‘AddressApp’, ‘OSaKtestStream’, v_options);END;/49
3. メッセージのPublish4. ロード・アプリケーションの実行Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)例:KafkaデータをOracle DatabaseのテーブルにロードSQL> DECLAREv_records_inserted integer;BEGINv_records_inserted := 2;DBMS_KAFKA.EXECUTE_LOAD_APP(‘KAFKACLUS1’, ‘AddressApp’, ‘address’, v_records_inserted);END;/$KAFKA_HOME/bin/kafka-console-producer.sh –bootstrap-server cell-1.streaming.ap-Tokyo-1.oci.oraclecloud.com:9092 –topic OSaKtestStream –producer.config $KAFKA_HOME/config/producer.properties>Fred Flintstone,2 BamBam Way,Bedrock,PA,04040>Barney Rubble,2 Dino Dr,Bedrock,PA,0404050
5. ロードしたデータの確認Copyright © 2023, Oracle and/or its affiliatesOracle SQL access to Kafka(OSaK)例:KafkaデータをOracle DatabaseのテーブルにロードSQL> SELECT * FROM ADDRESS;NAME ADDRESS CITY ST ZIPCODE--------------- ------------------ -------------- -- -----------Fred Flintstone 2 Bambam Way Bedrock PA 4040Barney Rubble 2 Dino Dr Bedrock PA 404051