亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

XFLY / 932人閱讀

摘要:指定了該迭代器返回元素的類型。這可能導(dǎo)致節(jié)點(diǎn)故障后的恢復(fù)速度較慢,因?yàn)樵撟鳂I(yè)將從最后一個檢查點(diǎn)恢復(fù)讀取。監(jiān)聽的端口過來的數(shù)據(jù)這個在從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡單程序入門文章里用的就是基于的程序。取消一個,也即將中的循環(huán)元素的行為終止。

前言

Data Sources 是什么呢?就字面意思其實(shí)就可以知道:數(shù)據(jù)來源。

Flink 做為一款流式計(jì)算框架,它可用來做批處理,即處理靜態(tài)的數(shù)據(jù)集、歷史的數(shù)據(jù)集;也可以用來做流處理,即實(shí)時的處理些實(shí)時數(shù)據(jù)流,實(shí)時的產(chǎn)生數(shù)據(jù)流結(jié)果,只要數(shù)據(jù)源源不斷的過來,F(xiàn)link 就能夠一直計(jì)算下去,這個 Data Sources 就是數(shù)據(jù)的來源地。

Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程序添加數(shù)據(jù)來源。

Flink 已經(jīng)提供了若干實(shí)現(xiàn)好了的 source functions,當(dāng)然你也可以通過實(shí)現(xiàn) SourceFunction 來自定義非并行的 source 或者實(shí)現(xiàn) ParallelSourceFunction 接口或者擴(kuò)展 RichParallelSourceFunction 來自定義并行的 source,

Flink

StreamExecutionEnvironment 中可以使用以下幾個已實(shí)現(xiàn)的 stream sources,

總的來說可以分為下面幾大類:

基于集合

1、fromCollection(Collection) - 從 Java 的 Java.util.Collection 創(chuàng)建數(shù)據(jù)流。集合中的所有元素類型必須相同。

2、fromCollection(Iterator, Class) - 從一個迭代器中創(chuàng)建數(shù)據(jù)流。Class 指定了該迭代器返回元素的類型。

3、fromElements(T ...) - 從給定的對象序列中創(chuàng)建數(shù)據(jù)流。所有對象類型必須相同。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream input = env.fromElements(
    new Event(1, "barfoo", 1.0),
    new Event(2, "start", 2.0),
    new Event(3, "foobar", 3.0),
    ...
);

4、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創(chuàng)建并行數(shù)據(jù)流。Class 指定了該迭代器返回元素的類型。

5、generateSequence(from, to) - 創(chuàng)建一個生成指定區(qū)間范圍內(nèi)的數(shù)字序列的并行數(shù)據(jù)流。

基于文件

1、readTextFile(path) - 讀取文本文件,即符合 TextInputFormat 規(guī)范的文件,并將其作為字符串返回。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text = env.readTextFile("file:///path/to/file");

2、readFile(fileInputFormat, path) - 根據(jù)指定的文件輸入格式讀取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內(nèi)部調(diào)用的方法。它根據(jù)給定的 fileInputFormat 和讀取路徑讀取文件。根據(jù)提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監(jiān)測給定路徑的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應(yīng)文件的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)。你可以通過 pathFilter 進(jìn)一步排除掉需要處理的文件。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

實(shí)現(xiàn):

在具體實(shí)現(xiàn)上,F(xiàn)link 把文件讀取過程分為兩個子任務(wù),即目錄監(jiān)控和數(shù)據(jù)讀取。每個子任務(wù)都由多帶帶的實(shí)體實(shí)現(xiàn)。目錄監(jiān)控由單個非并行(并行度為1)的任務(wù)執(zhí)行,而數(shù)據(jù)讀取由并行運(yùn)行的多個任務(wù)執(zhí)行。后者的并行性等于作業(yè)的并行性。單個目錄監(jiān)控任務(wù)的作用是掃描目錄(根據(jù) watchType 定期掃描或僅掃描一次),查找要處理的文件并把文件分割成切分片(splits),然后將這些切分片分配給下游 reader。reader 負(fù)責(zé)讀取數(shù)據(jù)。每個切分片只能由一個 reader 讀取,但一個 reader 可以逐個讀取多個切分片。

重要注意:

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當(dāng)文件被修改時,其內(nèi)容將被重新處理。這會打破“exactly-once”語義,因?yàn)樵谖募┪哺郊訑?shù)據(jù)將導(dǎo)致其所有內(nèi)容被重新處理。

如果 watchType 設(shè)置為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然后退出,而不等待 reader 完成文件內(nèi)容的讀取。當(dāng)然 reader 會繼續(xù)閱讀,直到讀取所有的文件內(nèi)容。關(guān)閉 source 后就不會再有檢查點(diǎn)。這可能導(dǎo)致節(jié)點(diǎn)故障后的恢復(fù)速度較慢,因?yàn)樵撟鳂I(yè)將從最后一個檢查點(diǎn)恢復(fù)讀取。

基于 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream> dataStream = env
        .socketTextStream("localhost", 9999) // 監(jiān)聽 localhost 的 9999 端口過來的數(shù)據(jù)
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

這個在 《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門 文章里用的就是基于 Socket 的 Word Count 程序。

自定義:

addSource - 添加一個新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(...)) 以從 Apache Kafka 讀取數(shù)據(jù)

說下上面幾種的特點(diǎn)吧

1、基于集合:有界數(shù)據(jù)集,更偏向于本地測試用

2、基于文件:適合監(jiān)聽文件修改并讀取其內(nèi)容

3、基于 Socket:監(jiān)聽主機(jī)的 host port,從 Socket 中獲取數(shù)據(jù)

4、自定義 addSource:大多數(shù)的場景數(shù)據(jù)都是無界的,會源源不斷的過來。比如去消費(fèi) Kafka 某個 topic 上的數(shù)據(jù),這時候就需要用到這個 addSource,可能因?yàn)橛玫谋容^多的原因吧,F(xiàn)link 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個基礎(chǔ)類,它是 Flink Kafka 消費(fèi)的最根本的類。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream input = env
        .addSource(
            new FlinkKafkaConsumer011<>(
                parameterTool.getRequired("input-topic"), //從參數(shù)中獲取傳進(jìn)來的 topic 
                new KafkaEventSchema(),
                parameterTool.getProperties())
            .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));

Flink 目前支持如下圖里面常見的 Source:

如果你想自己自定義自己的 Source 呢?

那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口,它繼承自一個標(biāo)記接口(空接口)Function。

SourceFunction 定義了兩個接口方法:

1、run : 啟動一個 source,即對接一個外部數(shù)據(jù)源然后 emit 元素形成 stream(大部分情況下會通過在該方法里運(yùn)行一個 while 循環(huán)的形式來產(chǎn)生 stream)。

2、cancel : 取消一個 source,也即將 run 中的循環(huán) emit 元素的行為終止。

正常情況下,一個 SourceFunction 實(shí)現(xiàn)這兩個接口方法就可以了。其實(shí)這兩個接口方法也固定了一種實(shí)現(xiàn)模板。

比如,實(shí)現(xiàn)一個 XXXSourceFunction,那么大致的模板是這樣的:(直接拿 FLink 源碼的實(shí)例給你看看)

最后

本文主要講了下 Flink 的常見 Source 有哪些并且簡單的提了下如何自定義 Source。

關(guān)注我

轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/

另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。

相關(guān)文章

1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹

2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門

3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解

4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/72072.html

相關(guān)文章

  • 01學(xué)習(xí)Flink》—— Data Sink 介紹

    摘要:從上圖可以看到接口有方法,它有一個抽象類。上面的那些自帶的可以看到都是繼承了抽象類,實(shí)現(xiàn)了其中的方法,那么我們要是自己定義自己的的話其實(shí)也是要按照這個套路來做的。 showImg(https://segmentfault.com/img/remote/1460000016956595); 前言 再上一篇文章中 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 講解了 Fli...

    thursday 評論0 收藏0
  • 01學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)

    摘要:這些切片稱為窗口。函數(shù)允許對常規(guī)數(shù)據(jù)流進(jìn)行分組。通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因?yàn)樗诜欠謪^(qū)數(shù)據(jù)流上運(yùn)行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹》 中就說過 Flink ...

    oujie 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<