1
雷鋒網按:本文根據姜偉華博士在數果智能新產品發布會 “智能時代大數據實時分析技術 DaTalk” 上的演講整理而來。
實時大數據分析是指對規模巨大的數據進行分析,利用大數據技術高效的快速完成分析,達到近似實時的效果,更及時的反映數據的價值和意義。
所有人都能理解數據的時效性對于數據的價值至關重要。以唯品會為例,唯品會已經有一整套非常成熟的離線數據倉庫系統。這套系統對于業務有非常大的指導意義,但目前碰到的問題是如何將各種計算、報表加速,從原來天級別、小時級別,加速到近實時來。
這是我們開始實時離線融合這個項目的緣由。該工作我們是從 2016 年下半年開始的,到目前為止它仍然只是一個半成品,因此這里面包含的很多內容并不是最終的結論,在多數情況下,它僅僅是以唯品會的特點為基礎,而不一定能無縫地適用于其他公司產品。我們希望拋磚引玉,對大家有所俾益。
1. 時效性與大數據
第一個問題是:什么是實時(real-time)? 什么是離線(offline)?很多時候,我們會當然的把實時等同于流處理(stream processing),等同于 Storm、Spark Streaming。但其實所謂實時和離線的區別其實是從時延(latency)的角度出發,如果時延短的就是實時,時延長的就是離線。
而時延就是從數據產生到計算出結果的時間差,時延是從端到端的,不僅僅是 Query 的執行時間。采用簡單的式子表示即為:時延 = 數據準備時間 + 查詢計算時間。
實時、近實時 (near realtime)、離線一般是以時延的時間長短為區分標準。實時表示毫秒、秒級時延;近實時主要是分鐘級時延;而離線是時延超過十分鐘。

而何為批處理、流處理?批處理,也常被稱為 “離線”,即數據以一個完整的數據集被處理可以重復計算,數據在落盤之后定時或者按需啟動計算。一般情況下,批處理一次處理的數據量大,延遲較大,經常需要全量計算。流處理,也常被稱為 “實時”,即數據以流式的方式(增量)被處理,它與批處理的特點完成相反。

然而實時計算并不等同于流式計算,即使大多數實時計算是流式計算,但很多也可以采用批處理來實現。同時,雖然在流式計算中實時或者準實時計算結果占了較大比例,流式計算也完全可能需要較長時間才能出結果,比如說 30 分鐘的 window,window 結束才輸出結果等。

所以說,實時計算并不等同于流式計算。業務的實時化并不一定要借助于流式計算來實現。下面我們來看看目前數據處理中之所以實時化要流式計算的瓶頸在何處。
2. 現狀及問題
唯品會是電子商務網站,數據可以分成兩大類: 行為埋點數據和交易類數據。下圖是交易類數據的一條典型處理鏈路,行為類數據的處理與之非常類似。

這張圖其實代表了當前大數據處理的一種典型架構。對于實時和離線而言,這兩條路徑是從源頭開始就完全分離的。
對于離線 / 批處理而言,數據層層加工。用戶可以簡易地使用 SQL,使用門檻低,并且其工具、理論、系統完備。然而它的延遲性高,并且不可控制(特別是在大促時)。
對于流式 / 實時計算而言,一切以時效性為目標,鏈路短,數據無層次,大量的應用直接處理 raw data。所以它的唯一優處在于它的時效性。但是它的開發難度高,邏輯復雜,資源需求很大,并且很難保證其數據質量。同時,需要為每個應用單獨去開發其應用邏輯,無法通用化。
對于實時應用(特別是報表)來說,對數是最痛苦的一件事情。典型場景是利用實時報表提供結果,但仍需要定時和離線報表去比對其正確性。一般普遍認為離線應用的精度要高于實時應用,但實時和離線的處理方法是完全不同的,其開發方式、方法,處理邏輯、數據來源都不一致,導致對數非常困難。而這其中最根本的是因為實時和離線從最本源開始就是兩條計算路徑。要在這完全不同的兩條路徑上對數,難度就非常非常大了。
我們也一直在反思怎么樣才能更好的支持業務的實時化。因為業務方總是會在抱怨數據不準,和離線對不上,口徑沒更新,開發效率低下,周期時間長等狀況,明明我們也在努力加班,努力滿足業務方要求,卻發現總是不能滿足業務的需求。
3. 實時離線融合
目前的實時化方法真的是正確的打開方式嗎? 對于這個問題,我們的理解是:
業務需要的是近實時。絕大部分業務只需要時延在分鐘、甚至 5~10 分鐘級別就可以了。并不需要秒級的時延。所以用 Storm/Spark Streaming 這樣的流式計算去實現,其實是一種殺雞用牛刀的行為。
業務方需要近實時,但目前只有實時團隊才有能力實時化。這個的原因是流式計算的開發門檻太高。但其實業務方是希望以他們容易掌控的方式實現近實時,而不是交給實時團隊去排期開發。
基于上面的理解,我們開展了實時離線融合這個項目。這個項目的目的就是:
讓業務方以他們熟悉的批處理方法來實現近實時的計算。
讓實時團隊專注于系統和平臺,而不是業務。
時延 = 數據準備時間 + 查詢時間。目前之所以無法用批處理方法實現近實時的計算就是因為這兩個步驟各自花的時間太長了。如果數據準備速度足夠快,并且計算速度也足夠敏捷,那么批處理也可以達到近實時的時延。
對于批處理而言,數據準備時間 = 定時調度時間 + 數據準備計算時間。只有在兩者都很小的情況下,數據準備時間才能大幅度地縮短。所以對于數據準備來說,使用流式處理來實現數據的實時準備是非常合理的想法。同時,因為這種數據準備的一般是基礎數據,和業務邏輯關系不大,所以也是很適合用流式的方法來實現的。

實時離線融合鏈路圖
在這個鏈路中,流式計算、批處理共享相同的數據準備步驟(清洗、打寬)。這些步驟保證數據是在毫秒級別就能處理完成的。處理完成的數據會落地到 Hive 中去(時延控制在分鐘級別)。這樣,Hive 中就有了近實時的已經準備好的基礎數據。需要近實時的應用就可以去訪問這些數據了。
實時數據落地 Hive, 即將大批量數據實時處理之后存入 Hive 中,提供給后端業務系統進行處理。目前我們的做法是每 5 分鐘一個 Hive 分區,數據按照 event time 落到相應的 Hive 分區,等待一定時間后關閉這個分區(這里我們借鑒了流處理中的 watermark 概念)。同時為了與現有的 Hive 分區保持兼容(即對于一個已關閉分區的兩次查詢應該得到相同的結果),也為了保證分區能及時關閉,規定若其數據在分區關閉后才到達,那么該數據將會落地到下一個分區。
對于那些不關心分區是否已關閉,而時效性要求高的應用,其可以在分鐘級訪問到數據(未關閉的分區);而對于大部分應用而言,可以選擇分區關閉后再查詢(數據準備的時延就在 5~6 分鐘左右)。
這種數據高頻落地也是存在著一些問題的。
第一,小文件過多(為了保證落地時延,必須增加并發),會導致查詢變慢。
第二,以普通磁盤為主的 HDFS(Hadoop 分布式文件系統)時延不穩定(每個分區的數據快的幾秒就完成,慢的需要幾分鐘)。這就對數據落地的 Spark Streaming 任務帶來了挑戰。
為了改善這些情況,我們對歷史分區 compact 以減少其文件數; 將普通磁盤為主的 HDFS 替換為 Alluxio 和以 SSD 為主的 HDFS 以減少其落地波動。數據放在高速文件系統中,不僅對落地波動情況有所改善,也可提高讀取速率。
對于和離線系統的無縫對接,我們目前的做法是在每個分區關閉后,向離線調度系統發信號說這個分區數據準備完成了,這樣離線調度系統就可以正常調度依賴這個分區的下游任務了。

當數據準備實時化了后,如何縮短離線查詢時間呢?查詢時間 = 定時調度時間 + 查詢計算時間。要達到近實時,必須減少其調度時間與查詢計算時間來提高離線應用。那么我們需要將高頻調度定時為五分鐘甚至小于五分鐘,并且合理地控制資源使用量,在查詢計算時,保證其中間結果不落地,使用 Spark SQL、Presto 替代 Hive,并且使用 ElasticSearch、Druid、Kylin 等做預計算,從而減少計算量,加速查詢計算。

如上圖所示。離線應用的三個維度,分別是對 NRT 的要求(業務自身的屬性),實現最小時延的代價(人力資源、機器資源),對數據精度的要求。每個應用在實時化都要考慮如何在 3 者之間取得一個平衡。
這種平衡就決定了存在著三種模式。
第一種是零代價加速,通過實時數據落地,可以透明地享受 30-50 分鐘的加速;
第二種追求極致的近實時,應用越實時越好,不惜一切代價,投入大量人力物力完全地重新實現邏輯;
第三種介于兩者之間,追求在資源有限情況下去加速,但盡量不增加其計算負擔。

在實時離線融合的場景下,ES、Druid、Kylin 等的作用會越來越重要。因為如果應用能夠使用這些帶預計算的存儲來實現的話,那么查詢計算時間就可以基本忽略不計。同時,因為這些存儲并沒有 Hive 那樣的分區概念,所以清洗打寬完的數據其實是可以流式的落到這些存儲中去的(秒級)。那么,用戶就可以以類似離線 SQL 的方式實現秒級的數據查詢。
4. 實時離線融合帶來的挑戰
實時離線融合并不是免費的午餐。它也帶來了一系列新的問題和挑戰。
對于實時 / 流式計算而言,它變成了所有大數據處理的一個前置。這就要求其作為平臺具有很高的穩定性、可靠性、可管理性、數據質量、SLA 保證。特別是現有的在流處理系統(Storm、Spark Streaming、Flink)在理論上還沒有完全實現 end-to-end exactly once 的情況下。一般認為批處理系統(Hive、Spark)是非常可靠的,且支持 exactly once 語義。將基礎數據準備從批處理系統替換為流處理系統,怎么保證其可靠性不降低是一個非常大的挑戰。
如何確保 Hive 中數據的質量,目前我們的做法是多方著手:
1. 全鏈路監控,保證數據質量;
2. 考慮各種極端場景的處理方法;
3. 發現問題時,如何重寫整個 Hive 分區;
4. 保留目前的離線小時抽數邏輯用于對數。
5. 改造目前的流框架來提供更好的處理語義保證。
對于離線(Hive、Spark)來說,應用要實時化,就必須高頻調度。這也帶來了一系列挑戰。如何提高調度效率?如何處理在上一次調度沒執行完情況下下一個批次的調度問題(數據積壓)?如何防止過度占用系統資源?這需要對于調度系統和應用都進行改造。另外,我們需要區分熱數據和冷數據。熱數據使用單獨的 SSD 或者 Alluxio 集群,而冷數據存儲在普通的 HDFS 中。
實時離線融合我們目前也只是完成了很多基礎數據的實時化,目前已經能夠比較明顯的看到效果。但這個任務是長期的。因為用戶一般更加喜歡使用天表等很寬的表,而目前實時化的更多是小時表等基礎表,如何實時化(或者加速)天表等寬表是我們目前在推進的一項工作。只有等這部分工作完成后,我們才能說實時離線融合真正成功了。
作者介紹
姜偉華 博士,國內最早的 Hadoop 發行版:IDH 的產品開發經理。主要研究方向集中于對大數據開發,從事大數據開源工作,曾經在 Intel 期間 2 年之內團隊培養出 10 位 committer,創建了上海大數據流處理 Meetup,創建 2 個新的 Apache 項目。目前在唯品會負責實時平臺。
實戰特訓:遠場語音交互技術
智能音箱這么火,聽聲智科技CTO教你深入解析AI設備語音交互關鍵技術!
課程鏈接:http://www.mooc.ai/course/80
加入AI慕課學院人工智能學習交流QQ群:624413030,與AI同行一起交流成長
雷峰網版權文章,未經授權禁止轉載。詳情見轉載須知。