亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專(zhuān)欄INFORMATION COLUMN

入門(mén)教程 | 5分鐘從零構(gòu)建第一個(gè) Flink 應(yīng)用

Mike617 / 761人閱讀

摘要:接著我們將數(shù)據(jù)流按照單詞字段即號(hào)索引字段做分組,這里可以簡(jiǎn)單地使用方法,得到一個(gè)以單詞為的數(shù)據(jù)流。得到的結(jié)果數(shù)據(jù)流,將每秒輸出一次這秒內(nèi)每個(gè)單詞出現(xiàn)的次數(shù)。最后一件事就是將數(shù)據(jù)流打印到控制臺(tái),并開(kāi)始執(zhí)行最后的調(diào)用是啟動(dòng)實(shí)際作業(yè)所必需的。

本文轉(zhuǎn)載自 Jark’s Blog ,作者伍翀(云邪),Apache Flink Committer,阿里巴巴高級(jí)開(kāi)發(fā)工程師。 本文將從開(kāi)發(fā)環(huán)境準(zhǔn)備、創(chuàng)建 Maven 項(xiàng)目,編寫(xiě) Flink 程序、運(yùn)行程序等方面講述如何迅速搭建第一個(gè) Flink 應(yīng)用。 在本文中,我們將從零開(kāi)始,教您如何構(gòu)建第一個(gè) Flink 應(yīng)用程序。

開(kāi)發(fā)環(huán)境準(zhǔn)備

Flink 可以運(yùn)行在 Linux, Max OS X, 或者是 Windows 上。為了開(kāi)發(fā) Flink 應(yīng)用程序,在本地機(jī)器上需要有 Java 8.xmaven 環(huán)境。

如果有 Java 8 環(huán)境,運(yùn)行下面的命令會(huì)輸出如下版本信息:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
如果有 maven 環(huán)境,運(yùn)行下面的命令會(huì)輸出如下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
另外我們推薦使用 ItelliJ IDEA (社區(qū)免費(fèi)版已夠用)作為 Flink 應(yīng)用程序的開(kāi)發(fā) IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項(xiàng)目下會(huì)有些已知問(wèn)題,所以不太推薦 Eclipse。下一章節(jié),我們會(huì)介紹如何創(chuàng)建一個(gè) Flink 工程并將其導(dǎo)入 ItelliJ IDEA。
創(chuàng)建 Maven 項(xiàng)目

我們將使用 Flink Maven Archetype 來(lái)創(chuàng)建我們的項(xiàng)目結(jié)構(gòu)和一些初始的默認(rèn)依賴(lài)。在你的工作目錄下,運(yùn)行如下命令來(lái)創(chuàng)建項(xiàng)目:

mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-quickstart-java 
    -DarchetypeVersion=1.6.1 
    -DgroupId=my-flink-project 
    -DartifactId=my-flink-project 
    -Dversion=0.1 
    -Dpackage=myflink 
    -DinteractiveMode=false

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數(shù),Maven 將自動(dòng)為你創(chuàng)建如下所示的項(xiàng)目結(jié)構(gòu):

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我們的 pom.xml 文件已經(jīng)包含了所需的 Flink 依賴(lài),并且在 src/main/java 下有幾個(gè)示例程序框架。接下來(lái)我們將開(kāi)始編寫(xiě)第一個(gè) Flink 程序。

編寫(xiě) Flink 程序

啟動(dòng) IntelliJ IDEA,選擇 "Import Project"(導(dǎo)入項(xiàng)目),選擇 my-flink-project 根目錄下的 pom.xml。根據(jù)引導(dǎo),完成項(xiàng)目導(dǎo)入。

在 src/main/java/myflink 下創(chuàng)建 SocketWindowWordCount.java 文件:

package myflink;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

  }
}

現(xiàn)在這程序還很基礎(chǔ),我們會(huì)一步步往里面填代碼。注意下文中我們不會(huì)將 import 語(yǔ)句也寫(xiě)出來(lái),因?yàn)?IDE會(huì)自動(dòng)將他們添加上去。在本節(jié)末尾,我會(huì)將完整的代碼展示出來(lái),如果你想跳過(guò)下面的步驟,可以直接將最后的完整代碼粘到編輯器中。

Flink 程序的第一步是創(chuàng)建一個(gè) StreamExecutionEnvironment 。這是一個(gè)入口類(lèi),可以用來(lái)設(shè)置參數(shù)和創(chuàng)建數(shù)據(jù)源以及提交任務(wù)。所以讓我們把它添加到 main 函數(shù)中:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們將創(chuàng)建一個(gè)從本地端口號(hào) 9000 的 socket 中讀取數(shù)據(jù)的數(shù)據(jù)源:

DataStream text = env.socketTextStream("localhost", 9000, "
");

這創(chuàng)建了一個(gè)字符串類(lèi)型的 DataStreamDataStream 是 Flink 中做流處理的核心 API,上面定義了非常多常見(jiàn)的操作(如,過(guò)濾、轉(zhuǎn)換、聚合、窗口、關(guān)聯(lián)等)。在本示例中,我們感興趣的是每個(gè)單詞在特定時(shí)間窗口中出現(xiàn)的次數(shù),比如說(shuō)5秒窗口。為此,我們首先要將字符串?dāng)?shù)據(jù)解析成單詞和次數(shù)(使用Tuple2表示),第一個(gè)字段是單詞,第二個(gè)字段是次數(shù),次數(shù)初始值都設(shè)置成了1。我們實(shí)現(xiàn)了一個(gè)flatmap,因?yàn)橐恍袛?shù)據(jù)中可能有多個(gè)單詞。

DataStream> wordCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });

接著我們將數(shù)據(jù)流按照單詞字段(即0號(hào)索引字段)做分組,這里可以簡(jiǎn)單地使用 keyBy(int index)方法,得到一個(gè)以單詞為 key 的Tuple2數(shù)據(jù)流。然后我們可以在流上指定想要的窗口,并根據(jù)窗口中的數(shù)據(jù)計(jì)算結(jié)果。在我們的例子中,我們想要每5秒聚合一次單詞數(shù),每個(gè)窗口都是從零開(kāi)始統(tǒng)計(jì)的。

DataStream> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

第二個(gè)調(diào)用的 .timeWindow()指定我們想要5秒的翻滾窗口(Tumble)。第三個(gè)調(diào)用為每個(gè)key每個(gè)窗口指定了sum聚合函數(shù),在我們的例子中是按照次數(shù)字段(即1號(hào)索引字段)相加。得到的結(jié)果數(shù)據(jù)流,將每5秒輸出一次這5秒內(nèi)每個(gè)單詞出現(xiàn)的次數(shù)。

最后一件事就是將數(shù)據(jù)流打印到控制臺(tái),并開(kāi)始執(zhí)行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

最后的 env.execute調(diào)用是啟動(dòng)實(shí)際Flink作業(yè)所必需的。所有算子操作(例如創(chuàng)建源、聚合、打?。┲皇菢?gòu)建了內(nèi)部算子操作的圖形。只有在execute()被調(diào)用時(shí)才會(huì)在提交到集群上或本地計(jì)算機(jī)上執(zhí)行。

下面是完整的代碼,部分代碼經(jīng)過(guò)簡(jiǎn)化(代碼在 GitHub 上也能訪問(wèn)到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 創(chuàng)建 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 通過(guò)連接 socket 獲取輸入數(shù)據(jù),這里連接到本地9000端口,如果9000端口已被占用,請(qǐng)換一個(gè)端口
    DataStream text = env.socketTextStream("localhost", 9000, "
");

    // 解析數(shù)據(jù),按 word 分組,開(kāi)窗,聚合
    DataStream> windowCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 將結(jié)果打印到控制臺(tái),注意這里使用的是單線程打印,而非多線程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}
運(yùn)行程序

要運(yùn)行示例程序,首先我們?cè)诮K端啟動(dòng) netcat 獲得輸入流:

nc -lk 9000

如果是 Windows 平臺(tái),可以通過(guò) nmap.org/ncat/ 安裝 ncat 然后運(yùn)行:

ncat -lk 9000

然后直接運(yùn)行SocketWindowWordCount的 main 方法。

只需要在 netcat 控制臺(tái)輸入單詞,就能在 SocketWindowWordCount 的輸出控制臺(tái)看到每個(gè)單詞的詞頻統(tǒng)計(jì)。如果想看到大于1的計(jì)數(shù),請(qǐng)?jiān)?秒內(nèi)反復(fù)鍵入相同的單詞。

Cheers !

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/6850.html

相關(guān)文章

  • 《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows

    摘要:在每個(gè)事件上,觸發(fā)器都可以決定觸發(fā)即清除刪除窗口并丟棄其內(nèi)容,或者啟動(dòng)并清除窗口。請(qǐng)注意,指定的觸發(fā)器不會(huì)添加其他觸發(fā)條件,但會(huì)替換當(dāng)前觸發(fā)器。結(jié)論對(duì)于現(xiàn)代流處理器來(lái)說(shuō),支持連續(xù)數(shù)據(jù)流上的各種類(lèi)型的窗口是必不可少的。 showImg(https://segmentfault.com/img/remote/1460000017892799?w=1280&h=720); 前言 目前有許多數(shù)...

    jifei 評(píng)論0 收藏0
  • Flink入門(mén)

    摘要:簡(jiǎn)介是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開(kāi)源計(jì)算平臺(tái),提供支持流處理和批處理兩種類(lèi)型應(yīng)用的功能。每一個(gè)數(shù)據(jù)流起始于一個(gè)或多個(gè),并終止于一個(gè)或多個(gè)。 Flink簡(jiǎn)介 Apache Flink 是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開(kāi)源計(jì)算平臺(tái),提供支持流處理和批處理兩種類(lèi)型應(yīng)用的功能。 Apache Flink的前身是柏林理工大學(xué)一個(gè)研究性項(xiàng)目,在2014被Apache孵化器...

    余學(xué)文 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<