喬叔的 Elastic Stack 專業教育訓練
  • 喬叔的 Elastic Stack 專業教育訓練
  • 🧑關於喬叔 (Joe Wu)
  • Elastic 課程公開班
    • 🎯Elasticsearch 基礎實務班
      • 💯學員課後回饋
    • 🆕Elasticsearch 進階運維班
      • 💯學員課後回饋
    • Elasticsearch 進階開發班
    • Elastic Stack 基礎實務班
    • Elastic Observability 基礎實務班
    • 📩課程許願池
  • 技術分享
    • 📗喬叔帶你上手 Elastic Stack
      • 前言
      • Elastic Cloud 如何建立 Deployment
        • ES Node 的種類
        • 配置的選擇
      • Index 建立前你該知道的
        • ES Index 如何被建立
        • ES 的超前佈署 - Dynamic Mapping
        • ES 的超前佈署 - Index Template
        • ES Index 的別名 (Alias)
        • 管理你的 Index - Kibana Index
      • 管理 Index 的 Best Practices
        • Shard 的數量與 Rollover & Shrink API
        • 三溫暖架構 - Hot Warm Cold Architecture
        • Index Lifecycle Management (ILM)
        • Rollup
        • Transform
        • Snapshot Lifecycle Management (SLM)
        • 總結
      • Elastic Cloud 比免費版還多的功能
        • Elastic Stack 的方案比較與銷售方式
        • Centralized Beats Management
        • Centralized Pipeline Management
        • Watcher
        • Elasticsearch Token Service
        • Multi-stack monitoring & Automatic stack issue alerts
      • 向 App Search 學習怎麼用 Elasticsearch
        • 揭開 App Search 的面紗
        • Engine 的 Index Settings 篇
        • Engine 的 Mapping 篇
        • Engine 的 Search 基礎剖析篇
        • Engine 的 Search 進階剖析篇
      • Elasticsearch 的優化技巧
        • Indexing 索引效能優化
        • Searching 搜尋效能優化
        • Index 的儲存空間最佳化
        • Shard 的最佳化管理
      • 完賽心得
    • 📘喬叔帶你上手 Elastic Stack - 探索與實踐 Observability 系列
      • 前言 & 淺談 Observability
      • Elastic 的 Observability 解決方案
      • Uptime - 掌握系統的生命徵象
        • 我們要觀測的生命徵象是什麼?
        • 使用 Heartbeat 收集系統生命徵象數據
        • 透過 Kibana 觀看心電圖及設定警報
        • 使用合成監控 (Synthetics Monitor) 從使用者情境驗證服務的運作狀態
      • Metrics - 觀察系統的健康指標
        • Metrics 與 Metricbeat 的基本介紹
        • 使用 Metricbeat 掌握 Elastic Stack 的健康狀態
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Host 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Docker 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 Kubernetes 篇
        • 使用 Metricbeat 掌握 Infrastructure 的健康狀態 AWS 篇
      • Logs - 挖掘系統內部發生的狀況
        • Logs 與 Filebeat 的基本介紹
        • 使用 Filebeat 應該要了解的設計細節與原理
        • 透過 Filebeat 收集 Elastic Stack 中各種服務的細節資訊
        • 透過 Filebeat 收集 Infrastructure 中各種服務的細節資訊
      • Traces - 觀察應用程式的效能瓶頸
        • Elastic APM 基本介紹
        • 使用 APM-Integratoin-Testing 建立 Elastic APM 的模擬環境
        • 如何在 Kibana 使用 APM UI
        • 使用 APM Server 來收集 APM 數據
        • 透過 APM Agents 收集並傳送後端服務運作的記錄
        • 透過真實使用者監控 (RUM, Real User Monitoring) 來改善使用者體驗
      • 建立結構化的 Log
        • Elastic Common Schema 結構化 Log 的規範
        • Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手
          • 基本介紹
          • 各種常用的 Processor
          • Enrich 資料與例外處理
      • 有效的使用 Observability 的資料
        • 透過 Machine Learning 發現異常的問題
        • 使用 Kibana Alerts 主動通知異常狀況
        • 資料的生命週期管理
        • 使用 Elastic Observability 追縱及觀察問題的心得
      • 完賽心得
    • 😀Elasticsearch 技術分享小品
      • 🤖Elastic 與 AI
        • Elasticsearch Inference API 讓我們直接在 ES 裡運用 OpenAI Completion API
    • 🎥線上分享
      • 喬叔 Elasticsearch Index 管理與效能優化技巧
      • Elastic Certification 認證經驗分享
    • 🛠️workshop
      • 如何在 Elasticsearch 實現敏捷的資料建模與管理 @ DevOpsDays 2023
        • 工作坊實作內容
      • Elastic Observability 實作體驗坊 @ DevOpsDays 2022
        • 行前準備
        • 工作坊實作內容
      • 當 Elasticsearch 搜尋引擎遇上 AI @ HelloWordDevConference 2024
        • 投影片
        • Elasticsearch 環境準備
        • Google Colab 環境準備
        • 工作坊操作說明
        • ElasticSearch Relevance Engine (ESRE)
    • ⬆️Elastic Stack 版本升級記錄
      • 🔍Elasticsearch
  • 其他專業服務
    • 👩‍🎓企業包班 | 企業內訓
    • 👨‍💼顧問服務
    • 🈺專案合作
    • 🧩Elastic 授權代理
  • 相關連結
    • Facebook 粉絲頁
Powered by GitBook
On this page
  • 本篇學習重點
  • 如何在 Ingest Pipeline 時 Enrich 資料
  • 什麼是 Enrich
  • Enrich Processor 的運作方式
  • 使用 Enrich Processor 的完整步驟
  • 一個實際使用 Enrich Processor 的例子
  • 參考官方 Geo Location 的範例
  • 使用 Ingest Pipeline 時的例外處理
  • 參考資料
  1. 技術分享
  2. 喬叔帶你上手 Elastic Stack - 探索與實踐 Observability 系列
  3. 建立結構化的 Log
  4. Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手

Enrich 資料與例外處理

本篇學習重點

  • 使用 Ingest Pipeline 時,想要透過查找其他資訊,將相關的資訊加入到處理的文件中,這個 Enrich 功能是如何運作及要怎麼使用。

  • 使用 Ingest Pipeline 時,若發生錯誤,要如何進行例外狀況的處理。


如何在 Ingest Pipeline 時 Enrich 資料

什麼是 Enrich

Enrich (充實、使豐富),指的是在 Ingest Pipeline 中,透過其他地方取得相關的資料,並加在原來的資料當中,讓資料更為豐富。

這種做法在資料處理 ETL (Extract, Transform, Load) 的過程中蠻常使用,也很重要的一種做法,能讓我們能做到『空間換時間』或是『先苦後甘』這樣的目的。

由於 Elasticsearch 不是關聯式資料庫,而是 Document Based (文件型) 的 NoSQL 資料庫,所以文件在存入 Elasticsearch 之前應該要視情況去正規化,同時為了追求查詢時能有較快的執行速度,會在文件存入時,盡可能將文件查詢時會使用到的資訊先一併寫入在文件之中,避免後續執行時要另外透過 Elasticsearch 的 Join 或是 Application 端另外處理資料查詢及合併等動作。

例如以下幾種情境:

  • 在存放銷售訂單的 Document 中,依照訂單裡的 Product ID 將 Product 的詳細資料查詢出來,加在銷售訂單的文件之中。

  • 透過已定義好的 IP 位置清單,識別出某一筆處理的請求是來自於客戶或是某個供應商。

  • 根據地理坐標,查出地址或是郵遞區號,加在原來的文件之中。

  • Web Server 的存取 logs,透過 IP反查出 Geo Location 的坐標資訊並且記錄在 log 之中,讓日後使用時能直接透過地圖呈現地區的分佈。

  • 在使用者點擊觀看影片記錄的 log 之中,先將日後分析時會使用到的影片類型、使用者資訊,先反查出來添加在 log 之中。

在 Elasticsearch Ingest Pipeline 的處理過程中,有定義一個 Enrich Processor ,就是專門提供資料 Enrich 的處理,接著將介紹這個 Enrich Processor 的運作方式。

Enrich Processor 的運作方式

先摘錄 Enrich Processor 的運作重點:

  • Lookup (查找) 的來源 (Source Index) 只能是 Elasticsearch Index,不支援從 Elasticsearch 的外部讀資料。

  • 會依照 Policy 的查找規則,將符合規則的資料轉存在另一個 Enrich Index 中。

  • Enrich Processor 在運作時,只會比對 Enrich Index 裡的資料,有找到就會加入到 Document 裡。

  • Source Index 的資料更新時,不會反應到 Enrich Index 裡,會需要另外重新執行 Policy,才能重新產生新的 Enrich Index 資料。

接下來我們針對運作的架構與流程進行較細部的說明。

上圖的運作架構,在 Ingest Pipeline 的處理過程中,加上了 enrich processor ,這個 enrich 的背後,共有三個不同的角色:

Enrich Policy

首先 Enrich Policy 是一組需要另外建立的設定,其中定義了 Enrich 的操作應該如何進行,包含

  • 定義存放 Enrich 資料的 Source Index。

  • policy_type 定義找資料時要用哪一種比對方式。

  • 指定 match 欄位,表示要從 Source Index 中的哪個欄位來進行查尋。

  • enrich_fields ,要將從 Source Index 中查尋到文件裡的哪些欄位,加入到原來的文件中。

Enrich Policy 是要經過 Execute (執行) 的 API 來觸發運作,並不是自動會在背景執行的機制,在執行時,會將 Source Index 裡符合條件的資料找出,並寫入到 Enrich Index 當中進行獨立的儲存。

注意,Enrich Policy 建立後不能修改,只能刪除並建立新的 Enrich Policy。

Source Index

Enrich 的處理過程中,會透過某個資料的來源進行查詢以取得額外的資料,這個資料來源必須是 Elasticsearch 中的 Index,也就所謂的 Source Index。

Source Index 可以是一個或多個 Elasticsearch 的 Index,而這個 Index 其實就是一般 Elasticsearch 的 Index,並沒有不同,所以能用一般存取的方式進行資料的維護,並且一個 Elasticsearch 的 Index 可以同時當作多個不同 Enrich 處理的 Source Index。

Enrich Index

由於每次 Enrich Processor 在處理 Indexing 的文件時,若當下直接從 Source Index 查找資料時,因為較花資源,另外也可能因為查詢條件較複雜會執行較久,所以 Enrich 的運作機制中,有定義了 Enrich Index,讓 Enrich Policy 執行時,透過 Elasticsearch 所建立一個系統層級的 Index,並且會與 Enrich Policy 綁定,裡面存放著在 Source Index 裡找到的文件,也是 Enrich Processor 在處理 Indexing 文件時,實際會用來查找資料的資料來源。

Enrich Index 有以下幾個特性:

  • 是由 Elasticsearch 所建立及維護的 Index,因此不應該直接去使用這些系統 Index。

  • Enrich Index 的名稱會是 .enrich-* 開頭。

  • Enrich Index 被建立之後,會執行 Segment files 的 force merged 的,以增加查詢時的效率。

  • Enrich Index 是唯讀的,也就是無法修改裡面的內容。

使用 Enrich Processor 的完整步驟

在了解 Enrich Processor 的運作方式之後,這邊來介紹要使用時的完整步驟:

  1. 準備 Source Index:在 Indexing Document 時,提供 Ingest Pipeline 的 Enrich 查閱的資料,將這些資料存放在 Elasticsearch 的 Index 之中。

  2. 在 Ingest Pipeline 中指定 enrich processor:可以將 enrich processor 添加到現有的 Ingest Pipeline 之中,或是建立新的 Ingest Pipeline。

  3. 將文件 Indexing 到 Elasticsearch 之中,並指定使用上面建立好的 Ingest Pipeline。

  4. 如果查閱的資料有異動,先更新到 Source Index 之中,再執行步驟 3 的 execute enrich policy,將 Enrich Index 的資料進行更新,如果先前已經 Ingest 的資料也想要回溯,可以另外透過 _reindex 或 update_by_query 並指定 Ingest Pipeline,以使用新的 Enrich Index 來更新資料。

  5. 如果 Enrich Policy 要修改,先建立新的 Enrich Policy,並且修改 enrich processor 使用新的 Enrich Policy,再刪除舊的 Enrich Policy。

一個實際使用 Enrich Processor 的例子

依照上述的步驟,我們首先準備 Source Index users:

PUT /users/_doc/1?refresh=wait_for
{
  "email": "mardy.brown@asciidocsmith.com",
  "first_name": "Mardy",
  "last_name": "Brown",
  "city": "New Orleans",
  "county": "Orleans",
  "state": "LA",
  "zip": 70116,
  "web": "mardy.asciidocsmith.com"
}

接著我們定義 Enrich Policy - users-policy,並且指定使用 email 欄位來進行查閱,若有查到,我們要將 first_name、last_name、city、zip、state 的資料增加到 indexing 的文件中。

PUT /_enrich/policy/users-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}

執行 Enrich Policy,以建立 Enrich Index。

POST /_enrich/policy/users-policy/_execute

這時可以先使用 _cat/indices 查看 Enrich Index 是否有正確建立:

GET _cat/indices/.enrich-users-policy*?v

並使用 _search 查看 Enrich Index 裡的內容:

GET .enrich-users-policy-*/_search

接著我們建立 Ingest Pipeline 並且使用 enrich processor

PUT /_ingest/pipeline/user_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add 'user' data based on 'email'",
        "policy_name": "users-policy",
        "field" : "email",
        "target_field": "user",
        "max_matches": "1"
      }
    }
  ]
}

我們可以 Indexing 文件,並指定 Ingest Pipeline 來確認是否正常運作

PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
  "email": "mardy.brown@asciidocsmith.com"
}

最後確認 Indexing 進入 Elasticsearch 的文件有正確的如我們的預期被 Enrich。

GET /my-index-000001/_doc/my_id

參考官方 Geo Location 的範例

使用 Ingest Pipeline 時的例外處理

使用 Ingest Pipeline 時,如果發生錯誤,預設的處理行為會丟出 Exception (例外狀況) 的錯誤,並且停止這筆資料的 Indexing 處理。

如果我們希望在某一個特定 Ingest Processor 的處理發生錯誤時,能忽略這個錯誤,繼續的向下執行,我們可以有三種作法:

  1. 在 processor 的設定中,指定 ignore_failure 的屬性,並設定成 true ,讓錯誤發生時,直接略過當前的 processor,進入下一個 processor 的處理。

  2. 在 processor 的設定中,指定 on_failure 的設定,讓錯誤發生時,執行另外一系列的 processors。(裡面的 processor 也可以再指定錯誤發生時的 on_failure,型成巢狀的設定)

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
  1. 直接在最外層的 pipeline 設定 on_failure,將整個 pipeline 最終會發生的錯誤,給抓住。(以下的例子配合 set processor,將這筆發生錯誤的資料,另外寫到指定的 index 中。)

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

在使用 on_failure 時,也可以使用以下的屬性,取得錯誤相關的資訊:

  • on_failure_message

  • on_failure_processor_type

  • on_failure_processor_tag

  • on_failure_pipeline

使用方式如下:

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

參考資料

Previous各種常用的 ProcessorNext有效的使用 Observability 的資料

Last updated 2 years ago

26-enrich-process

建立 Enrich Policy:透過 來建立 Enrich Policy。

執行 Enrich Policy:使用 針對上面建立好的 Enrich Policy 來觸發執行,並建立出 Enrich Index。

除了上述的 term 查閱的 Enrich 方式,Enrich Processor 也有提供 geo_shape 查閱方式,可以參考 。

📘
create enrich policy API
execute enrich policy API
官方文件 - Enrich you data based on geolocation
官方文件 - Elasticsearch Ingest Enrich Data
官方文件 - Create Enrich Policy API
官方文件 - Enrich you data based on geolocation
官方文件 - Elasticsearch Ingest - Handling Pipeline Failures