亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Spark學習實例(Python):窗口操作 Window

系統 1771 0

說到流處理,Spark為我們提供了窗口函數,允許在滑動數據窗口上應用轉換,常用場景如每五分鐘商場人流密度、每分鐘流量等等,接下來我們通過畫圖來了解Spark Streaming的窗口函數如何工作的,處理過程圖如下所示:

Spark學習實例(Python):窗口操作 Window Operations_第1張圖片

上圖中綠色的小框框是一批一批的數據流,虛線框和實線框分別是前一個窗口和后一個窗口,從圖中可以看出后一個窗口在前一個窗口基礎上移動了兩個批次的數據流,而我們真正通過算子操作的數據其實就是窗口內所有的數據流。

在代碼實現前了解下窗口操作常用的函數有:

  • window
  • countByWindow
  • reduceByWindow
  • reduceByKeyAndWindow
  • reduceByKeyAndWindow
  • countByValueAndWindow

window最原始的窗口,提供兩個參數,第一個參數是窗口長度,第二個參數是滑動間隔,返回一個新的DStream, 返回的結果可以進行算子操作,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.window(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-13 19:46:45
    # -------------------------------------------
    # hello
    # world
    ssc.start()
    ssc.awaitTermination()
            
          

現在終端使用nc發送數據

root@root:~$ nc -lk 9999
hello
world

countByWindow統計每個滑動窗口內數據條數,要注意的是使用該函數要加上checkpoint機制,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.countByWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 18:56:40
    # -------------------------------------------
    # 2
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByWindow聚合每個鍵的值,底層執行的是reduceByKeyAndWindow,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x):
    return x

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行指定函數, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒
    dstream = lines.reduceByWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # hello
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByKeyAndWindow是對(K,V)窗口數據相同的K執行對應的fun,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x,y):
    return x+y

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行的功能函數fun, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒,
    # 第四個參數設定并行度
    dstream = lines.map(lambda x:(x,1)).reduceByKeyAndWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 2)
    ssc.start()
    ssc.awaitTermination()
            
          

countByValueAndWindow是對窗口數據進行單詞統計,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒, 第三個參數任務并行度
    dstream = lines.countByValueAndWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 3)
    # ('world', 1)
    ssc.start()
    ssc.awaitTermination()
            
          

以上就是所有窗口函數的使用

?

Spark學習目錄:

  • Spark學習實例1(Python):單詞統計 Word Count
  • Spark學習實例2(Python):加載數據源Load Data Source
  • Spark學習實例3(Python):保存數據Save Data
  • Spark學習實例4(Python):RDD轉換 Transformations
  • Spark學習實例5(Python):RDD執行 Actions
  • Spark學習實例6(Python):共享變量Shared Variables
  • Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
  • Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
  • Spark學習實例9(Python):窗口操作 Window Operations

?

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 伊人久久精品一区二区三区 | 久草综合视频 | 成人国产第一区在线观看 | 成人a免费视频播放 | 欧美成视频一theporn | 四虎影视在线观看2022a | 国产在线中文字幕 | 99久久国产视频 | 欧美精品aaa久久久影院 | 色综合亚洲综合网站综合色 | 四虎影视成人 | 中文字幕日韩一区二区 | 天天舔天天干天天操 | 精品成人免费一区二区在线播放 | 欧美操穴视频 | 国内精品视频成人一区二区 | 亚洲乱码视频在线观看 | 国产精品久久久久久久久久一区 | 最新中文字幕在线播放 | 国产一区二区三区在线影院 | 久久国内精品自在自线400部o | 国产精品资源在线 | 亚洲欧美视频二区 | 久久久精品久久视频只有精品 | 欧美亚洲国产精品久久第一页 | 毛片破处| 亚洲精品专区一区二区三区 | 日日干夜夜艹 | 四虎高清在线精品免费观看 | 欧美一区二区在线免费观看 | 性感毛片| 国产精品久久久久影视青草 | 久久蜜月| 亚州色拍拍拍 | 日本一区高清视频 | 中文字幕在线高清 | 久久精品| 天天天天天天操 | 亚洲国产一区二区a毛片日本 | 午夜视频在线观看网站 | 国产精品福利尤物youwu |