Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Spark_Task_Optimization_Journey_How_I_Increased_10x_Speed_by_Performance_Tuning

tlyu0419
October 01, 2023

 Spark_Task_Optimization_Journey_How_I_Increased_10x_Speed_by_Performance_Tuning

Conference: PyCon Taiwan 2023
Date: 2023/9/2
Abstract:
公司長期以來存在某些工作排程需要較長的執行時間,由於專案使用的資料表本身就較為肥大,我們很容易自然的接受這些排程需要較長執行時間、資源的現狀,然而隨著上線的排程數量的快速增加,系統負擔與效能的問題也開始受到越來越多的重視
除了盲目加大硬體資源的方案外,其實我們可以先從 spark 的資源配置、參數設定以及程式邏輯等面向進行優化,讓工作排程能兼顧執行效率與系統資源
在這次的演講中我將以公司某項工作排程為例,和大家分享我分析這項工作排程的思路,並說明我逐步測試、優化的過程,最終成功將排程的執行時間從 5 小時大幅縮短至 30 分鐘

tlyu0419

October 01, 2023
Tweet

More Decks by tlyu0419

Other Decks in Science

Transcript

  1. • 公司長期以來存在某些工作排程需要較長的執行時間,
    由於專案使用的資料表本身就較為肥大,我們很容易自
    然的接受這些排程需要較長執行時間、資源的現狀,然
    而隨著上線的排程數量的快速增加,系統負擔與效能的
    問題也開始受到越來越多的重視
    • 除了盲目加大硬體資源的方案外,其實我們可以先從
    spark 的資源配置、參數設定以及程式邏輯等面向進行
    優化,讓工作排程能兼顧執行效率與系統資源
    • 在這次的演講中我將以公司某項工作排程為例,和大家
    分享我分析這項工作排程的思路,並說明我逐步測試、
    優化的過程,最終成功將排程的執行時間從 5 小時大幅
    縮短至 30 分鐘
    PyCon Taiwan 2023 September 2-3, 2023
    Spark Task Optimization Journey:
    How I Increased 10x Speed by Performance Tuning
    About 游騰林(tlyu0419)
    騰林是來自國泰數據部的資料科學家,擅長透過 Python,
    R 建置機器學習/深度學習模型、網路爬蟲、資料視覺化
    以及圖形演算法。曾在 PyCon, PyData, AIAcademic,
    MOPCON, COSCUP 等年會擔任講者
    代表的開源專案是 facebook-crawler 的 Python 套件,
    可以協助使用者快速收集 Facebook 粉絲頁/社團的貼文
    資料,發佈至今已進累積超過3萬次的下載量

    View Slide

  2. The process of a secret ML/DL project
    • 模型案整體跑約 5 小時,因為資料本身較肥大,大家認為這是正常的現象
    • 隨著專案上線的數量變多,效能的問題就開始被重視
    2
    後處理
    預處理 訓練模型 預測 送檔
    讀取資料
    Hadoop
    TeradataSQL
    資料前處理
    特徵工程
    特徵選擇
    模型調參
    預測機率值 業務邏輯加工 寫表 / 落檔
    發Mail通知
    <5mins
    <5mins
    <10mins
    <1 hours
    <1.5hrs
    >3hrs

    View Slide

  3. 請公司加錢買更好的設備 程式、性能優化
    Solutions for performance optimization
    3

    View Slide

  4. Outline
    Methods Speedup Tips

    View Slide

  5. Sample Table
    • 某張很肥大的信用卡消費資料表(這邊是通過 faker 生成假資料)
    • 姓名、身份證號、卡號、消費日期、消費公司、金額、消費地址
    • 實際的資料中有數億筆記錄,約 80 GB
    5
    Ref: joke2k/faker: Faker is a Python package that generates fake data for you.

    View Slide

  6. Methods of transferring (bulk) data
    • Local files
    • Apache Sqoop
    • pandas read_sql
    • pyspark read table
    6

    View Slide

  7. Export and read data through local files
    • 把資料先從 DB 中匯出落成實體檔,然後再用 Python 讀取實體檔
    7
    缺點是多一段資料匯出的流程,
    也會佔用 Disk 空間,業界通常不會用此方式

    View Slide

  8. Apache Sqoop
    • Apache Sqoop is a tool designed for efficiently transferring bulk data
    between Apache Hadoop and structured datastores.
    • Sqoop import help
    • --connection-manager
    • --connect
    • --user
    • --password
    • --query
    • --hive-table
    • Sqoop 約需 2 小時
    8
    Ref: Sqoop - (apache.org)

    View Slide

  9. pandas read_sql
    • 通過 SQLAlchemy / Jaydebe 建立 connector,再以 pandas.read_sql 讀取資料
    • Connector
    • host
    • user
    • password
    • pandas.read_sql
    • sql
    • con
    • chunksize
    • Pandas 需要 > 3.5 小時
    9
    Ref: pandas.read_sql — pandas 2.0.3 documentation

    View Slide

  10. Pyspark read table
    • 透過 Pyspark 直接至 Database 讀取資料
    • dbtable
    • 放資料表名稱
    • 執行後會讀取整張資料表
    • query
    • 放 sql 的語法
    • 可以彈性讀取/整理資料表
    • 兩個讀取方式都需要 3 小時
    10
    Ref: JDBC To Other Databases - Spark 3.3.2 Documentation

    View Slide

  11. • Pysaprk 下有非常繁雜的參數
    11
    Pyspark connection properties
    Ref: JDBC To Other Databases - Spark 3.3.2 Documentation

    View Slide

  12. • Pysaprk 下有非常繁雜的參數
    12
    Ref: JDBC To Other Databases - Spark 3.3.2 Documentation
    Pyspark connection properties
    • 和查詢效能有關的參數
    • Fetch_size
    Determines how many rows to fetch per
    round trip. This can help performance on
    JDBC drivers
    • Partition Column
    They describe how to partition the table
    when reading in parallel from multiple
    workers.
    • numPartition
    The maximum number of partitions that
    can be used for parallelism in table
    reading and writing.

    View Slide

  13. fetchsize
    • 官方文件說調大 fetchsize 可以增加查詢速度,但我沒有試出來
    • 測試後需要 3.2 小時,與預設參數的速度差不多(至少沒有明顯增加)
    • 後續可以再多測試 100, 1000, 100000, 1000000 等不同的參數值
    13
    Ref: Read JDBC in Parallel using PySpark

    View Slide

  14. partitionColumn
    • partitionColumn 要搭配 upper/lowerBound 和 numPartitions 一起使用
    • upperBound 和 lowerBound 分別為資料的上下界,藉由 上/下界 和 numPartitions 的數量,
    將原資料表切分成多份子集,分頭進行工作,藉此提升讀取效率
    • partitionColumn 必須要是數值、時間、日期格式,且並不能與 query 模式共用
    14
    以 TXN_AMT(刷卡金額) 為例,
    相關參數設定如下:
    • upperBound: 100
    • lowerBound: 0
    • numPartitions: 4
    會切分出 4 個子資料集
    <= 25, 25 ~ 50, 50~75, >75

    View Slide

  15. partitionColumn experiment round 1
    • 以刷卡金額(txn_amt) 作為 partition column
    • LowerBound, UpperBound 分別設定資料中的最大(9,998,506) 和 最小值(-9,415,705)
    • numPartition: 10
    • 測試後需要跑 4.5小時,反而比不調整設定的 3小時 還多 50% 的時間,Why?
    15

    View Slide

  16. partitionColumn experiment round 1
    • 進到 Application detail 後可以看到如下資訊
    • 9 個 worker 在很短時間內就完成執行,剩下 1 個 worker 則還在拼命跑
    • 問題明顯在 10 個 worker 間的分工並不平均!
    • 再回到 upper / lowerBound 和 numPartitions 的邏輯,估計問題是出在是資料
    表中的刷卡金額過度集中在某些區段中,才導致了這個問題
    16
    worker1
    worker2
    worker3
    worker4
    worker5
    worker6
    worker7
    worker8
    worker9
    worker10

    View Slide

  17. partitionColumn experiment round 2
    • 既然切成 10 份 partition 會遇到不均衡的問題,那麼切成 1,000 份呢?
    這樣總能讓 10 個 worker 都有機會輪到比較繁重的工作了吧!
    • 結果: 不均衡的問題一樣存在,而且…
    17

    View Slide

  18. partitionColumn experiment round 2
    • 既然切成 10 份 partition 會遇到不均衡的問題,那麼切成 1,000 份呢?
    這樣總能讓 10 個 worker 都有機會輪到比較繁重的工作了吧!
    • 結果: 不均衡的問題一樣存在,而且…
    18

    View Slide

  19. So, how to set partitionColumn correctly?
    • 要 By Domain 設定正確的欄位來切 partition 才能讓資料子集數量均衡
    • 以交易日期(txn_date)來切資料並切成 10 個子集,最終時間縮短為 30 分鐘
    • 如果需要再加速,還可以 適度 調大 spark 的 executor 數和 numPartition 數
    19
    worker1
    worker2
    worker3
    worker4
    worker5
    worker6
    worker7
    worker8
    worker9
    worker10

    View Slide

  20. Pyspark read table - Advanced usage
    • 困境
    • partitionColumn 的設定很吃 Domain,設定
    錯誤不只無法加速還會變慢
    • partitionColumn 只吃數值或時間類型欄位,
    而有些資料表根本沒有合適的欄位
    • 解方
    • 先用 count 算資料表的筆數
    • 將 query 轉為子查詢,並新增 row_number
    欄位作為 partition column,藉此均勻的切資料
    • 將子查詢的查詢語法放入 dbtable 中使用
    20

    View Slide

  21. Spark resource configuration
    • Spark 參數的設定方式
    • numPartition: 10(推薦) ~ 50
    > 發給 倉儲 請求(request)資料的次數,
    過多就會佔用 TD 的 Session 通道
    > 但並行的數量還是取決於 executor 數!
    • spark maxExecutor: 5(推薦) ~ 10
    > 並行向 TD 請求資料的線程數,過大會佔用太
    多倉儲的效能
    > 調大可以增加查詢速度
    • spark executor memory: 5G(推薦) ~ 10G
    > 當 numPartition 增大時,可以再降低
    executor 的 memory
    21
    這邊放畫面的截圖

    View Slide

  22. Spark SQL's Catalyst Optimizer
    22
    左邊和下面的 SQL 都是在算每位客戶的總刷卡金額
    左邊這種寫法的查詢效能會比較差嗎? 為什麼?
    (實際上沒人會用左邊這麼奇怪的寫法,單純用來說明)
    Ref: Deep Dive into Spark SQL's Catalyst Optimizer

    View Slide

  23. cache
    • 在做特徵工程時,經常會對特定的資料表
    先製作中繼表,再透過中繼表抽取出需要
    的變數
    • 如果能將 RDD1 的階段暫存起來,就不用
    每次都從 RDD0 開始讀取/處理資料,進而
    達到節省資料的目的
    • 暫存的使用方式是 cache
    23
    Ref: Spark技术内幕:深入解析Spark内核架构设计与实现原理

    View Slide

  24. Inspect spark dataframe's partition
    • 如果 Partition 內的資料筆數不均勻,會無法充分幫 Spark 的工作加速
    • 要適時檢查各階段的狀態,搭配 repartition 來平衡各 partition 中的資料量
    24

    View Slide

  25. Inspect spark dataframe's partition
    • 如果 Partition 內的資料筆數不均勻,會無法充分幫 Spark 的工作加速
    • 要適時檢查各階段的狀態,搭配 repartition 來平衡各 partition 中的資料量
    25
    這個範例資料表共分出了 120 個 partition,
    其中資料筆數有相當大的落差,無法充分加速

    View Slide

  26. Spark dataframe to pandas dataframe
    • 對某張 spark dataframe 先做 repartition 後再 toPandas
    • 資料表約 1,000 萬筆資料,33個欄位
    • Repartition 數量分別試了 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,50,100,500, 1000, 5000, 10000, 100000
    26
    Repartition 數 執行結果 時間
    38(Default) 成功 1.2 mins
    1-5 Executor Lost Failure
    6-10 成功 1.5 - 2.5 mins
    50-10000 成功 0.8 - 1.1 mins
    100000 成功 2 mins
    為什麼明明是操作同樣的資料表,
    有些 partition 數會失敗,有些可以加速?

    View Slide

  27. pandas dataframe to Spark dataframe
    • 對某張 pandas dataframe 先做 createDataFrame,再 saveAsTable
    • 資料表約 1,000 萬筆資料,33個欄位
    • 不改參數的情況下, createDataFrame 用了40分鐘, saveAsTable 用了 20小時還跑不完
    • 到底慢在哪裡?
    • saveAsTable: 檢查轉換後的 spark dataframe 的 partition 數只有 2
    >>發現: 資料表切的 partition 數太少就沒辦法做加速,相當於用二個線程在做事情
    • createDataFrame: 根據警告訊息找到要新增 ARROW_PRE_0_15_IPC_FORMAT 的環境變數
    >> 發現: 新增環境變數後 40 分鐘僅需 1 分鐘就完成轉換
    27
    Ref: [SPARK-29367] pandas udf not working with latest pyarrow release (0.15.0)
    新增 ARROW_PRE_0_15_IPC_FORMAT 的環境變數
    是讓 spark 通過 pyarrow 套件來加速資料轉換的過程

    View Slide

  28. Summary
    • 說明 pyspark 拉資料的參數設置方式與陷阱
    • partitionColumn 的設定很吃 Domain,設定錯誤不只無法加速還會變慢
    • 解決方式(目前試到比較快的方法)
    • 對資料表新增 row_number 的欄位,並藉此作為 partitionColumn 來進行加速
    • Spark 資源配置建議與調整方式
    • 其他 spark 任務的加速技巧
    • Cache
    • Partition number
    • Spark dataframe to pandas dataframe
    • pandas dataframe to Spark dataframe
    28

    View Slide

  29. 國泰產險 - 數據部相關職缺介紹
    資深資料科學分析師
    資料科學分析師
    數據專案管理師
    商業數據分析師 資料科學工程師
    資料倉儲工程師
    Contact 游騰林(tlyu0419)
    tlyu0419@Facebook, Twitter, Instagram
    Mail: [email protected]

    View Slide

  30. 30
    QA?

    View Slide