亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

示例:python模擬日志生成+Flume+Kafka+Spark

系統 1982 0

生成模擬數據

  1. 編寫 generate_log.py
            
              
                #coding=UTF-8
              
              
                import
              
               random

              
                import
              
               time

url_paths
              
                =
              
              
                [
              
              
                "class/112.html"
              
              
                ,
              
              
                "class/128.html"
              
              
                ,
              
              
                "class/145.html"
              
              
                ,
              
              
                "class/130.html"
              
              
                ,
              
              
                "class/146.html"
              
              
                ,
              
              
                "class/131.html"
              
              
                ,
              
              
                "learn/821"
              
              
                ,
              
              
                "course/list"
              
              
                ]
              
              

ip_slices
              
                =
              
              
                [
              
              
                132
              
              
                ,
              
              
                156
              
              
                ,
              
              
                124
              
              
                ,
              
              
                10
              
              
                ,
              
              
                29
              
              
                ,
              
              
                167
              
              
                ,
              
              
                143
              
              
                ,
              
              
                187
              
              
                ,
              
              
                30
              
              
                ,
              
              
                46
              
              
                ,
              
              
                55
              
              
                ,
              
              
                63
              
              
                ,
              
              
                72
              
              
                ,
              
              
                87
              
              
                ,
              
              
                98
              
              
                ,
              
              
                168
              
              
                ]
              
              

http_referers
              
                =
              
              
                [
              
              
                "https://www.baidu.com/s?wd={query}"
              
              
                ,
              
              
                "https://www.sogou.com/web?query={query}"
              
              
                ,
              
              
                "https://cn.bing.com/search?q={query}"
              
              
                ,
              
              
                "https://www.so.com/s?q={query}"
              
              
                ]
              
              

search_keyword
              
                =
              
              
                [
              
              
                "spark sql實戰"
              
              
                ,
              
              
                "hadoop 基礎"
              
              
                ,
              
              
                "storm實戰"
              
              
                ,
              
              
                "spark streaming實戰"
              
              
                ]
              
              

status_code
              
                =
              
              
                [
              
              
                "200"
              
              
                ,
              
              
                "404"
              
              
                ,
              
              
                "500"
              
              
                ]
              
              
                def
              
              
                sample_status_code
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              status_code
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_referer
              
              
                (
              
              
                )
              
              
                :
              
              
                if
              
               random
              
                .
              
              uniform
              
                (
              
              
                0
              
              
                ,
              
              
                1
              
              
                )
              
              
                >
              
              
                0.2
              
              
                :
              
              
                return
              
              
                "-"
              
              
    refer_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              http_referers
              
                ,
              
              
                1
              
              
                )
              
              
    query_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              search_keyword
              
                ,
              
              
                1
              
              
                )
              
              
                return
              
               refer_str
              
                [
              
              
                0
              
              
                ]
              
              
                .
              
              
                format
              
              
                (
              
              query
              
                =
              
              query_str
              
                [
              
              
                0
              
              
                ]
              
              
                )
              
              
                def
              
              
                sample_url
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              url_paths
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_ip
              
              
                (
              
              
                )
              
              
                :
              
              
                slice
              
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              ip_slices
              
                ,
              
              
                4
              
              
                )
              
              
                return
              
              
                "."
              
              
                .
              
              join
              
                (
              
              
                [
              
              
                str
              
              
                (
              
              item
              
                )
              
              
                for
              
               item 
              
                in
              
              
                slice
              
              
                ]
              
              
                )
              
              
                def
              
              
                generate_log
              
              
                (
              
              count
              
                =
              
              
                10
              
              
                )
              
              
                :
              
              
    time_str
              
                =
              
              time
              
                .
              
              strftime
              
                (
              
              
                "%Y-%m-%d %H:%M:%S"
              
              
                ,
              
              time
              
                .
              
              localtime
              
                (
              
              
                )
              
              
                )
              
              

    f
              
                =
              
              
                open
              
              
                (
              
              
                "C:/Users/DaiRenLong/Desktop/streaming_access.log"
              
              
                ,
              
              
                "w+"
              
              
                )
              
              
                while
              
               count 
              
                >=
              
              
                1
              
              
                :
              
              
        query_log
              
                =
              
              
                "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}"
              
              
                .
              
              
                format
              
              
                (
              
              url
              
                =
              
              sample_url
              
                (
              
              
                )
              
              
                ,
              
              ip
              
                =
              
              sample_ip
              
                (
              
              
                )
              
              
                ,
              
              refer
              
                =
              
              sample_referer
              
                (
              
              
                )
              
              
                ,
              
              status_code
              
                =
              
              sample_status_code
              
                (
              
              
                )
              
              
                ,
              
              local_time
              
                =
              
              time_str
              
                )
              
              
                print
              
              
                (
              
              query_log
              
                )
              
              
        f
              
                .
              
              write
              
                (
              
              query_log
              
                +
              
              
                "\n"
              
              
                )
              
              
        count
              
                =
              
              count
              
                -
              
              
                1
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 每一分鐘生成一次日志信息
              
              
                while
              
              
                True
              
              
                :
              
              
        generate_log
              
                (
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                60
              
              
                )
              
            
          
  1. 日志文件對接flume==>kafka
    Flume配置文件: https://blog.csdn.net/drl_blogs/article/details/95192574#execkafkaconf_1
  2. 運行flume
            
              flume-ng agent \
--name exec-memory-logger \
--conf conf 
              
                $FLUME_HOME
              
              /conf \
--conf-file 
              
                $FLUME_HOME
              
              /conf/streaming_project.conf \
-Dflume.root.logger
              
                =
              
              INFO,console 
              
                &
              
            
          
  1. 運行kafka消費者
            
               kafka-console-consumer.sh \
 --zookeeper hadoop01:2181 \
 --topic kafka_streaming_topic

            
          
  1. 運行python文件測試
            
               python generate_log.py

            
          
  1. 查看kafka消費者消費者是否有信息

  2. 編寫代碼打通通道

            
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              log4j
              
                .
              
              
                {
              
              Level
              
                ,
              
               Logger
              
                }
              
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              SparkConf

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              kafka
              
                .
              
              KafkaUtils

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              
                {
              
              Seconds
              
                ,
              
               StreamingContext
              
                }
              
              

object kafka_Receiver_streaming 
              
                {
              
              
  Logger
              
                .
              
              
                getLogger
              
              
                (
              
              
                "org"
              
              
                )
              
              
                .
              
              
                setLevel
              
              
                (
              
              Level
              
                .
              
              WARN
              
                )
              
              
  def 
              
                main
              
              
                (
              
              args
              
                :
              
               Array
              
                [
              
              String
              
                ]
              
              
                )
              
              
                :
              
               Unit 
              
                =
              
              
                {
              
              
    val sparkConf 
              
                =
              
              
                new
              
              
                SparkConf
              
              
                (
              
              
                )
              
              
                .
              
              
                setAppName
              
              
                (
              
              
                "kafka_Receiver_streaming"
              
              
                )
              
              
                .
              
              
                setMaster
              
              
                (
              
              
                "local[*]"
              
              
                )
              
              
                .
              
              
                set
              
              
                (
              
              
                "spark.port.maxRetries"
              
              
                ,
              
              
                "100"
              
              
                )
              
              

    val ssc 
              
                =
              
              
                new
              
              
                StreamingContext
              
              
                (
              
              sparkConf
              
                ,
              
              
                Seconds
              
              
                (
              
              
                60
              
              
                )
              
              
                )
              
              

    val messages 
              
                =
              
               KafkaUtils
              
                .
              
              
                createStream
              
              
                (
              
              ssc
              
                ,
              
              
                "hadoop01:2181"
              
              
                ,
              
              
                "test"
              
              
                ,
              
              
                Map
              
              
                (
              
              
                "kafka_streaming_topic"
              
              
                -
              
              
                >
              
              
                1
              
              
                )
              
              
                )
              
              
    messages
              
                .
              
              
                map
              
              
                (
              
              _
              
                .
              
              _2
              
                )
              
              
                .
              
              
                count
              
              
                (
              
              
                )
              
              
                .
              
              
                print
              
              
                (
              
              
                )
              
              

    ssc
              
                .
              
              
                start
              
              
                (
              
              
                )
              
              
    ssc
              
                .
              
              
                awaitTermination
              
              
                (
              
              
                )
              
              
                }
              
              
                }
              
            
          
  1. 運行代碼查看結果

更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 久久色精品| 韩国日本美国免费毛片 | 久久中文字幕亚洲精品最新 | 久久午夜剧场 | 亚洲天堂777 | 精品福利视频第一 | 99综合网| 欧美丰满大乳大屁股毛片 | 久久久久久中文字幕 | 一本色道久久综合狠狠躁 | 成人在线视频网址 | 亚洲国产天堂在线mv网站 | 四虎精品福利视频精品 | 亚洲视频在线免费观看 | 日本免费人成黄页网观看视频 | 成人精品亚洲 | 亚洲国产一区二区在线 | 亚洲成aⅴ人片在线观 | 香蕉视频在线视频 | 九九视频在线播放 | 亚洲狠狠婷婷综合久久久图片 | 日日夜夜人人 | 婷婷综合激情五月中文字幕 | 久草视频在线免费看 | 91成人在线播放 | 91在线视频在线 | 日韩免费精品一级毛片 | 亚洲自拍第二页 | 亚洲高清视频在线 | 色资源在线观看 | 国产精品麻豆一区二区 | 色另类| 国产不卡视频在线观看 | 伊人久久大香线蕉亚洲 | 99精品中文字幕 | 国产女主播喷出白浆视频 | 一级女人18片毛片免费视频 | 亚洲片在线 | www.一区 | 日本久久免费 | 久久精品日本免费线 |