基本介紹
Last updated
Elasticserach 內建的 Ingest Pipeline 基本介紹
如何使用 Ingest Pipeline
使用 Ingest Pipeline 時的注意事項
Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理,這個前處理可以像是以下幾種例子:
將原始的資料豐富化 (enrich),透過查找 Elasticsearch 裡存放在別的 Index 裡的相關資料,加入到文件之中,來豐富原有的文件。
將日期格式正確的從文件中的某個欄位擷取出來,讓 Elasticsearch 的 @timestamp
有正確的時間。
將 IP 的欄位,透過反查 GeoIP 的資料庫,加入 GeoLocation 的資訊在文件中,以利於之後能使用地圖檢視資料。
將文字內容,依照特定的格式,拆解成結構化的 JSON 欄位與值。
將 URL 拆解成包含 path
、scheme
、port
、domain
、query
…各個欄位。
將 URL 中 Query String 裡的 Key / Value,轉成結構化的 JSON 欄位與值。
當某個條件成立時,填加某個固定值。
上圖是 Ingest Pipeline 運作時的主要流程,在這過程中,我們分別解釋每個階段的運作及操作方式:
Incoming Documents (傳入的文件): 在使用 Indexing API、或是 _bulk
API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。
Ingest Pipeline (擷取管道): Elasticsearch 在到 Indexing 的請求時,如果有指定 Ingest Pipeline,Coordinator (協調者) Node 會把這個請求,交給 ingest
node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。
Target Index (目標索引): Ingest Pipeline 在處理完之後,會將最終的文件,透過 Coordinator 傳送到 Primary Shard 所在的 Node,進行 Indexing 後續的處理。
接下來將會依照實際準備時的步驟,進行說明。
在文件進入 Elasticsearch 之前,我們必須先準備好 Ingest Pipeline 的定義,這邊主要是透過 _ingest
的 API 進行設定:
<pipeline>
:是自己取的 pipeline 名稱
提供一個實際的範例如下:
Pipeline 的名字是 my-pipeline-id
。
version
是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。
processors
裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 官方文件 - Ingest Processor Reference。
_meta
是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊。
除了建立 pipeline 的這個 API,Ingest API 總共有提供:
Create or update pipeline
Get pipeline
Delete pipeline
可以參考 官方文件 - Ingest APIs 的使用說明。
另外 Kibana 也有提供 UI 的設定畫面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:
在建立 Create Pipeline 時,就能夠使透過 Add a processor 進入以下的畫面選擇要使用的 processor 並進行相關的設定。
當我們依照需求定義好 Ingest Pipeline 之後,我們可以透過 _simulate
API,並提供測試的文件,確認一下 Ingest Pipeline 的執行結果。
_simulate
API 有提供兩種模式,第一種是針對還沒有建立 Pipeline 時,一併透過 API 模擬指定的 Pipeline 定義 + 測試文件,另一種是已經建立好 Pipelien 的定義,提供測試文件來試用。
我們針對 my-pipeline-id
這個 pipeline 來測試,並提供兩份文件:
可以得到回傳結果:
我們不用指定 Pipeline 名稱,直接使用 _ingest/pipeline/_simulate
的 API,並且在 Request Body 中帶入 pipeline
的定義:
以下是 Simulate 的回傳結果:
在 Kibana 之中,也有對應的 Test Pipeline 功能,可以直接填入測試的文件,來檢驗 Pipeline 的運作結果。
當上述的方式建立好 Ingest Pipeline 之後,我們將 Document Indexing 進入 Elasticsearch 時,就可以指定要使用 Ingest Pipeline,以下是常用的幾種方式:
使用 Index API 時,指定 pipeline
的名字
使用 Bulk API 時,同樣也可以指定 pipeline
的名字
使用 Update by Query 時,也可以指定 pipeline
的名字
Reindex 時也可以指定 pipeline
的名字
在 Index Setting 中,可以透過以下的設定,來決定資料寫入這個 Index 時,要透過 ingest pipeline 來處理:
index.default_pipeline
:如果 request 沒有帶入指定的 pipeline
,就會依照這個設定來執行。
index.final_pipeline
:這是不論有沒有其他經由 request 帶入的 pipeline
或是 default_pipeline
的設定,在最後都一定會執行的的 pipeline 設定。(也就是如果有其他指定的 pipeline,這兩種 pipeline 都會被執行)
因此也可以透過 Index Template 指定 Index setting 中的這兩個設定值。
其他 Beats、Logstash,也都會有對應的設定,能夠在資料透過 Index API 或是 Bulk API 寫入時,指定要使用的 Ingest Pipeline,詳細請參考這些產品的官方文件說明。
Elasticsearch Cluster 中,至少要有一個有啟動 ingest
角色的 Node,如果 Ingest 的工作量很繁重的話,建議安排專門處理 Ingest 的 Node 來進行 Ingest 任務的處理,又或是使用 Logstash 等其他 ETL 工具,避免佔用資源而影響 Elasticsearch 其他功能的運作。
如果有啟用 Security 的功能,會需要擁有 manage_pipeline
的權限,如果要從 Kibana 的 Ingest Node Pipeline 畫面來操作 Ingest Pipeline 的功能的話,另外還會需要 cluster:monitor/nodes/info
的權限。
在使用 Pipeline 時,版本管理也會是很重要的一件事,為了能更有效率的避免 Pipeline 的定義是舊版,而造成資料 Indexing 時是以非預期的方式處理,善用 version
的版本號碼,並且將 Pipeline 的定義進行版控管理,在佈署或除錯時,也能多透過確認 Pipeline 的 version
來確保版本的正確性。
以上介紹了 Elasticsearch Ingest Pipeline 的基本說明,接下來會以實際的例子進行介紹,說明 Ingest Pipeline 如何協助我們將 Log 結構化。