喬叔的 Elastic Stack 專業教育訓練
  • 喬叔的 Elastic Stack 專業教育訓練
  • 🧑關於喬叔 (Joe Wu)
  • Elastic 課程公開班
    • 🎯Elasticsearch 基礎實務班
      • 💯學員課後回饋
    • 🆕Elasticsearch 進階運維班
      • 💯學員課後回饋
    • Elasticsearch 進階開發班
    • Elastic Stack 基礎實務班
    • Elastic Observability 基礎實務班
    • 📩課程許願池
  • 技術分享
    • 📗喬叔帶你上手 Elastic Stack
      • 前言
      • Elastic Cloud 如何建立 Deployment
        • ES Node 的種類
        • 配置的選擇
      • Index 建立前你該知道的
        • ES Index 如何被建立
        • ES 的超前佈署 - Dynamic Mapping
        • ES 的超前佈署 - Index Template
        • ES Index 的別名 (Alias)
        • 管理你的 Index - Kibana Index
      • 管理 Index 的 Best Practices
        • Shard 的數量與 Rollover & Shrink API
        • 三溫暖架構 - Hot Warm Cold Architecture
        • Index Lifecycle Management (ILM)
        • Rollup
        • Transform
        • Snapshot Lifecycle Management (SLM)
        • 總結
      • Elastic Cloud 比免費版還多的功能
        • Elastic Stack 的方案比較與銷售方式
        • Centralized Beats Management
        • Centralized Pipeline Management
        • Watcher
        • Elasticsearch Token Service
        • Multi-stack monitoring & Automatic stack issue alerts
      • 向 App Search 學習怎麼用 Elasticsearch
        • 揭開 App Search 的面紗
        • Engine 的 Index Settings 篇
        • Engine 的 Mapping 篇
        • Engine 的 Search 基礎剖析篇
        • Engine 的 Search 進階剖析篇
      • Elasticsearch 的優化技巧
        • Indexing 索引效能優化
        • Searching 搜尋效能優化
        • Index 的儲存空間最佳化
        • Shard 的最佳化管理
      • 完賽心得
    • 📘喬叔帶你上手 Elastic Stack - 探索與實踐 Observability 系列
      • 前言 & 淺談 Observability
      • Elastic 的 Observability 解決方案
      • Uptime - 掌握系統的生命徵象
        • 我們要觀測的生命徵象是什麼?
        • 使用 Heartbeat 收集系統生命徵象數據
        • 透過 Kibana 觀看心電圖及設定警報
        • 使用合成監控 (Synthetics Monitor) 從使用者情境驗證服務的運作狀態
      • Metrics - 觀察系統的健康指標
        • Metrics 與 Metricbeat 的基本介紹
        • 使用 Metricbeat 掌握 Elastic Stack 的健康狀態
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Host 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Docker 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Kubernetes 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 AWS 篇
      • Logs - 挖掘系統內部發生的狀況
        • Logs 與 Filebeat 的基本介紹
        • 使用 Filebeat 應該要了解的設計細節與原理
        • 透過 Filebeat 收集 Elastic Stack 中各種服務的細節資訊
        • 透過 Filebeat 收集 Infrastructure 中各種服務的細節資訊
      • Traces - 觀察應用程式的效能瓶頸
        • Elastic APM 基本介紹
        • 使用 APM-Integratoin-Testing 建立 Elastic APM 的模擬環境
        • 如何在 Kibana 使用 APM UI
        • 使用 APM Server 來收集 APM 數據
        • 透過 APM Agents 收集並傳送後端服務運作的記錄
        • 透過真實使用者監控 (RUM, Real User Monitoring) 來改善使用者體驗
      • 建立結構化的 Log
        • Elastic Common Schema 結構化 Log 的規範
        • Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手
          • 基本介紹
          • 各種常用的 Processor
          • Enrich 資料與例外處理
      • 有效的使用 Observability 的資料
        • 透過 Machine Learning 發現異常的問題
        • 使用 Kibana Alerts 主動通知異常狀況
        • 資料的生命週期管理
        • 使用 Elastic Observability 追縱及觀察問題的心得
      • 完賽心得
    • 😀Elasticsearch 技術分享小品
      • 🤖Elastic 與 AI
        • Elasticsearch Inference API 讓我們直接在 ES 裡運用 OpenAI Completion API
    • 🎥線上分享
      • 喬叔 Elasticsearch Index 管理與效能優化技巧
      • Elastic Certification 認證經驗分享
    • 🛠️workshop
      • 如何在 Elasticsearch 實現敏捷的資料建模與管理 @ DevOpsDays 2023
        • 工作坊實作內容
      • Elastic Observability 實作體驗坊 @ DevOpsDays 2022
        • 行前準備
        • 工作坊實作內容
      • 當 Elasticsearch 搜尋引擎遇上 AI @ HelloWordDevConference 2024
        • 投影片
        • Elasticsearch 環境準備
        • Google Colab 環境準備
        • 工作坊操作說明
        • ElasticSearch Relevance Engine (ESRE)
    • ⬆️Elastic Stack 版本升級記錄
      • 🔍Elasticsearch
  • 其他專業服務
    • 👩‍🎓企業包班 | 企業內訓
    • 👨‍💼顧問服務
    • 🈺專案合作
    • 🧩Elastic 授權代理
  • 相關連結
    • Facebook 粉絲頁
Powered by GitBook
On this page
  • 本篇學習重點
  • Ingest Pipeline 的功用
  • Ingest Pipeline 的運作及使用方式
  • 定義 Ingest Pipeline
  • 上線前使用 Simulate 模擬一下
  • Indexing 資料時,使用 Ingest Pipeline 的方法
  • 使用 Ingest Pipeline 的注意事項
  • 參考資料
  1. 技術分享
  2. 喬叔帶你上手 Elastic Stack - 探索與實踐 Observability 系列
  3. 建立結構化的 Log
  4. Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手

基本介紹

PreviousElasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手Next各種常用的 Processor

Last updated 2 years ago

本篇學習重點

  • Elasticserach 內建的 Ingest Pipeline 基本介紹

  • 如何使用 Ingest Pipeline

  • 使用 Ingest Pipeline 時的注意事項

Ingest Pipeline 的功用

Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理,這個前處理可以像是以下幾種例子:

  • 將原始的資料豐富化 (enrich),透過查找 Elasticsearch 裡存放在別的 Index 裡的相關資料,加入到文件之中,來豐富原有的文件。

  • 將日期格式正確的從文件中的某個欄位擷取出來,讓 Elasticsearch 的 @timestamp 有正確的時間。

  • 將 IP 的欄位,透過反查 GeoIP 的資料庫,加入 GeoLocation 的資訊在文件中,以利於之後能使用地圖檢視資料。

  • 將文字內容,依照特定的格式,拆解成結構化的 JSON 欄位與值。

  • 將 URL 拆解成包含 path、scheme、port、domain、query…各個欄位。

  • 將 URL 中 Query String 裡的 Key / Value,轉成結構化的 JSON 欄位與值。

  • 當某個條件成立時,填加某個固定值。

Ingest Pipeline 的運作及使用方式

24-ingest-pipeline-flow

上圖是 Ingest Pipeline 運作時的主要流程,在這過程中,我們分別解釋每個階段的運作及操作方式:

  • Incoming Documents (傳入的文件): 在使用 Indexing API、或是 _bulk API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。

  • Ingest Pipeline (擷取管道): Elasticsearch 在到 Indexing 的請求時,如果有指定 Ingest Pipeline,Coordinator (協調者) Node 會把這個請求,交給 ingest node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。

  • Target Index (目標索引): Ingest Pipeline 在處理完之後,會將最終的文件,透過 Coordinator 傳送到 Primary Shard 所在的 Node,進行 Indexing 後續的處理。

接下來將會依照實際準備時的步驟,進行說明。

定義 Ingest Pipeline

使用 Ingest APIs

在文件進入 Elasticsearch 之前,我們必須先準備好 Ingest Pipeline 的定義,這邊主要是透過 _ingest 的 API 進行設定:

PUT /_ingest/pipeline/<pipeline>
  • <pipeline>:是自己取的 pipeline 名稱

提供一個實際的範例如下:

PUT /_ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "description" : "My optional pipeline description",
  "processors" : [
    {
      "set" : {
        "description" : "My optional processor description",
        "field": "my-keyword-field",
        "value": "foo"
      }
    }
  ],
  "_meta": {
    "reason": "set my-keyword-field to foo",
    "serialization": {
      "class": "MyPipeline",
      "id": 10
    }
  }
}
  • Pipeline 的名字是 my-pipeline-id。

  • version 是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。

  • _meta 是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊。

除了建立 pipeline 的這個 API,Ingest API 總共有提供:

  • Create or update pipeline

  • Get pipeline

  • Delete pipeline

使用 Kibana Ingest Node Pipeline

另外 Kibana 也有提供 UI 的設定畫面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:

在建立 Create Pipeline 時,就能夠使透過 Add a processor 進入以下的畫面選擇要使用的 processor 並進行相關的設定。

上線前使用 Simulate 模擬一下

當我們依照需求定義好 Ingest Pipeline 之後,我們可以透過 _simulate API,並提供測試的文件,確認一下 Ingest Pipeline 的執行結果。

_simulate API 有提供兩種模式,第一種是針對還沒有建立 Pipeline 時,一併透過 API 模擬指定的 Pipeline 定義 + 測試文件,另一種是已經建立好 Pipelien 的定義,提供測試文件來試用。

Simulate 已建立好的 Pipeline

我們針對 my-pipeline-id 這個 pipeline 來測試,並提供兩份文件:

POST /_ingest/pipeline/my-pipeline-id/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

可以得到回傳結果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

Simulate 指定的 Pipeline 規則 + 測試的文件

我們不用指定 Pipeline 名稱,直接使用 _ingest/pipeline/_simulate 的 API,並且在 Request Body 中帶入 pipeline 的定義:

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

以下是 Simulate 的回傳結果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

使用 Kibana 的 Test Pipeline

在 Kibana 之中,也有對應的 Test Pipeline 功能,可以直接填入測試的文件,來檢驗 Pipeline 的運作結果。

Indexing 資料時,使用 Ingest Pipeline 的方法

當上述的方式建立好 Ingest Pipeline 之後,我們將 Document Indexing 進入 Elasticsearch 時,就可以指定要使用 Ingest Pipeline,以下是常用的幾種方式:

Index API

使用 Index API 時,指定 pipeline 的名字

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

Bulk API

使用 Bulk API 時,同樣也可以指定 pipeline 的名字

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

Update By Query

使用 Update by Query 時,也可以指定 pipeline 的名字

POST my-data-stream/_update_by_query?pipeline=my-pipeline

Reindex

Reindex 時也可以指定 pipeline 的名字

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

Index Setting 或透過 Index Tempate

在 Index Setting 中,可以透過以下的設定,來決定資料寫入這個 Index 時,要透過 ingest pipeline 來處理:

  • index.default_pipeline:如果 request 沒有帶入指定的 pipeline,就會依照這個設定來執行。

  • index.final_pipeline:這是不論有沒有其他經由 request 帶入的 pipeline 或是 default_pipeline 的設定,在最後都一定會執行的的 pipeline 設定。(也就是如果有其他指定的 pipeline,這兩種 pipeline 都會被執行)

因此也可以透過 Index Template 指定 Index setting 中的這兩個設定值。

其他 Elastic Stack

其他 Beats、Logstash,也都會有對應的設定,能夠在資料透過 Index API 或是 Bulk API 寫入時,指定要使用的 Ingest Pipeline,詳細請參考這些產品的官方文件說明。

使用 Ingest Pipeline 的注意事項

  • Elasticsearch Cluster 中,至少要有一個有啟動 ingest 角色的 Node,如果 Ingest 的工作量很繁重的話,建議安排專門處理 Ingest 的 Node 來進行 Ingest 任務的處理,又或是使用 Logstash 等其他 ETL 工具,避免佔用資源而影響 Elasticsearch 其他功能的運作。

  • 如果有啟用 Security 的功能,會需要擁有 manage_pipeline 的權限,如果要從 Kibana 的 Ingest Node Pipeline 畫面來操作 Ingest Pipeline 的功能的話,另外還會需要 cluster:monitor/nodes/info 的權限。

  • 在使用 Pipeline 時,版本管理也會是很重要的一件事,為了能更有效率的避免 Pipeline 的定義是舊版,而造成資料 Indexing 時是以非預期的方式處理,善用 version 的版本號碼,並且將 Pipeline 的定義進行版控管理,在佈署或除錯時,也能多透過確認 Pipeline 的 version 來確保版本的正確性。


以上介紹了 Elasticsearch Ingest Pipeline 的基本說明,接下來會以實際的例子進行介紹,說明 Ingest Pipeline 如何協助我們將 Log 結構化。

參考資料

processors 裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 。

可以參考 的使用說明。

24-kibana-ingest-pipeline
24-kibana-ingest-pipeline-create
24-kibana-ingest-pipeline-test

📘
官方文件 - Ingest Processor Reference
官方文件 - Ingest APIs
官方文件 - Ingest Pipelines
官方文件 - Ingest Processor Reference
官方文件 - Ingest APIs