Enrich 資料與例外處理

本篇學習重點

  • 使用 Ingest Pipeline 時,想要透過查找其他資訊,將相關的資訊加入到處理的文件中,這個 Enrich 功能是如何運作及要怎麼使用。

  • 使用 Ingest Pipeline 時,若發生錯誤,要如何進行例外狀況的處理。


如何在 Ingest Pipeline 時 Enrich 資料

什麼是 Enrich

Enrich (充實、使豐富),指的是在 Ingest Pipeline 中,透過其他地方取得相關的資料,並加在原來的資料當中,讓資料更為豐富。

這種做法在資料處理 ETL (Extract, Transform, Load) 的過程中蠻常使用,也很重要的一種做法,能讓我們能做到『空間換時間』或是『先苦後甘』這樣的目的。

由於 Elasticsearch 不是關聯式資料庫,而是 Document Based (文件型) 的 NoSQL 資料庫,所以文件在存入 Elasticsearch 之前應該要視情況去正規化,同時為了追求查詢時能有較快的執行速度,會在文件存入時,盡可能將文件查詢時會使用到的資訊先一併寫入在文件之中,避免後續執行時要另外透過 Elasticsearch 的 Join 或是 Application 端另外處理資料查詢及合併等動作。

例如以下幾種情境:

  • 在存放銷售訂單的 Document 中,依照訂單裡的 Product ID 將 Product 的詳細資料查詢出來,加在銷售訂單的文件之中。

  • 透過已定義好的 IP 位置清單,識別出某一筆處理的請求是來自於客戶或是某個供應商。

  • 根據地理坐標,查出地址或是郵遞區號,加在原來的文件之中。

  • Web Server 的存取 logs,透過 IP反查出 Geo Location 的坐標資訊並且記錄在 log 之中,讓日後使用時能直接透過地圖呈現地區的分佈。

  • 在使用者點擊觀看影片記錄的 log 之中,先將日後分析時會使用到的影片類型、使用者資訊,先反查出來添加在 log 之中。

在 Elasticsearch Ingest Pipeline 的處理過程中,有定義一個 Enrich Processor ,就是專門提供資料 Enrich 的處理,接著將介紹這個 Enrich Processor 的運作方式。

Enrich Processor 的運作方式

先摘錄 Enrich Processor 的運作重點:

  • Lookup (查找) 的來源 (Source Index) 只能是 Elasticsearch Index,不支援從 Elasticsearch 的外部讀資料。

  • 會依照 Policy 的查找規則,將符合規則的資料轉存在另一個 Enrich Index 中。

  • Enrich Processor 在運作時,只會比對 Enrich Index 裡的資料,有找到就會加入到 Document 裡。

  • Source Index 的資料更新時,不會反應到 Enrich Index 裡,會需要另外重新執行 Policy,才能重新產生新的 Enrich Index 資料。

接下來我們針對運作的架構與流程進行較細部的說明。

上圖的運作架構,在 Ingest Pipeline 的處理過程中,加上了 enrich processor ,這個 enrich 的背後,共有三個不同的角色:

Enrich Policy

首先 Enrich Policy 是一組需要另外建立的設定,其中定義了 Enrich 的操作應該如何進行,包含

  • 定義存放 Enrich 資料的 Source Index。

  • policy_type 定義找資料時要用哪一種比對方式。

  • 指定 match 欄位,表示要從 Source Index 中的哪個欄位來進行查尋。

  • enrich_fields ,要將從 Source Index 中查尋到文件裡的哪些欄位,加入到原來的文件中。

Enrich Policy 是要經過 Execute (執行) 的 API 來觸發運作,並不是自動會在背景執行的機制,在執行時,會將 Source Index 裡符合條件的資料找出,並寫入到 Enrich Index 當中進行獨立的儲存。

注意,Enrich Policy 建立後不能修改,只能刪除並建立新的 Enrich Policy。

Source Index

Enrich 的處理過程中,會透過某個資料的來源進行查詢以取得額外的資料,這個資料來源必須是 Elasticsearch 中的 Index,也就所謂的 Source Index。

Source Index 可以是一個或多個 Elasticsearch 的 Index,而這個 Index 其實就是一般 Elasticsearch 的 Index,並沒有不同,所以能用一般存取的方式進行資料的維護,並且一個 Elasticsearch 的 Index 可以同時當作多個不同 Enrich 處理的 Source Index。

Enrich Index

由於每次 Enrich Processor 在處理 Indexing 的文件時,若當下直接從 Source Index 查找資料時,因為較花資源,另外也可能因為查詢條件較複雜會執行較久,所以 Enrich 的運作機制中,有定義了 Enrich Index,讓 Enrich Policy 執行時,透過 Elasticsearch 所建立一個系統層級的 Index,並且會與 Enrich Policy 綁定,裡面存放著在 Source Index 裡找到的文件,也是 Enrich Processor 在處理 Indexing 文件時,實際會用來查找資料的資料來源。

Enrich Index 有以下幾個特性:

  • 是由 Elasticsearch 所建立及維護的 Index,因此不應該直接去使用這些系統 Index。

  • Enrich Index 的名稱會是 .enrich-* 開頭。

  • Enrich Index 被建立之後,會執行 Segment files 的 force merged 的,以增加查詢時的效率。

  • Enrich Index 是唯讀的,也就是無法修改裡面的內容。

使用 Enrich Processor 的完整步驟

在了解 Enrich Processor 的運作方式之後,這邊來介紹要使用時的完整步驟:

  1. 準備 Source Index:在 Indexing Document 時,提供 Ingest Pipeline 的 Enrich 查閱的資料,將這些資料存放在 Elasticsearch 的 Index 之中。

  2. 建立 Enrich Policy:透過 create enrich policy API 來建立 Enrich Policy。

  3. 執行 Enrich Policy:使用 execute enrich policy API 針對上面建立好的 Enrich Policy 來觸發執行,並建立出 Enrich Index。

  4. 在 Ingest Pipeline 中指定 enrich processor:可以將 enrich processor 添加到現有的 Ingest Pipeline 之中,或是建立新的 Ingest Pipeline。

  5. 將文件 Indexing 到 Elasticsearch 之中,並指定使用上面建立好的 Ingest Pipeline。

  6. 如果查閱的資料有異動,先更新到 Source Index 之中,再執行步驟 3 的 execute enrich policy,將 Enrich Index 的資料進行更新,如果先前已經 Ingest 的資料也想要回溯,可以另外透過 _reindex 或 update_by_query 並指定 Ingest Pipeline,以使用新的 Enrich Index 來更新資料。

  7. 如果 Enrich Policy 要修改,先建立新的 Enrich Policy,並且修改 enrich processor 使用新的 Enrich Policy,再刪除舊的 Enrich Policy。

一個實際使用 Enrich Processor 的例子

依照上述的步驟,我們首先準備 Source Index users

PUT /users/_doc/1?refresh=wait_for
{
  "email": "mardy.brown@asciidocsmith.com",
  "first_name": "Mardy",
  "last_name": "Brown",
  "city": "New Orleans",
  "county": "Orleans",
  "state": "LA",
  "zip": 70116,
  "web": "mardy.asciidocsmith.com"
}

接著我們定義 Enrich Policy - users-policy,並且指定使用 email 欄位來進行查閱,若有查到,我們要將 first_namelast_namecityzipstate 的資料增加到 indexing 的文件中。

PUT /_enrich/policy/users-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}

執行 Enrich Policy,以建立 Enrich Index。

POST /_enrich/policy/users-policy/_execute

這時可以先使用 _cat/indices 查看 Enrich Index 是否有正確建立:

GET _cat/indices/.enrich-users-policy*?v

並使用 _search 查看 Enrich Index 裡的內容:

GET .enrich-users-policy-*/_search

接著我們建立 Ingest Pipeline 並且使用 enrich processor

PUT /_ingest/pipeline/user_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add 'user' data based on 'email'",
        "policy_name": "users-policy",
        "field" : "email",
        "target_field": "user",
        "max_matches": "1"
      }
    }
  ]
}

我們可以 Indexing 文件,並指定 Ingest Pipeline 來確認是否正常運作

PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
  "email": "mardy.brown@asciidocsmith.com"
}

最後確認 Indexing 進入 Elasticsearch 的文件有正確的如我們的預期被 Enrich。

GET /my-index-000001/_doc/my_id

參考官方 Geo Location 的範例

除了上述的 term 查閱的 Enrich 方式,Enrich Processor 也有提供 geo_shape 查閱方式,可以參考 官方文件 - Enrich you data based on geolocation

使用 Ingest Pipeline 時的例外處理

使用 Ingest Pipeline 時,如果發生錯誤,預設的處理行為會丟出 Exception (例外狀況) 的錯誤,並且停止這筆資料的 Indexing 處理。

如果我們希望在某一個特定 Ingest Processor 的處理發生錯誤時,能忽略這個錯誤,繼續的向下執行,我們可以有三種作法:

  1. 在 processor 的設定中,指定 ignore_failure 的屬性,並設定成 true ,讓錯誤發生時,直接略過當前的 processor,進入下一個 processor 的處理。

  2. 在 processor 的設定中,指定 on_failure 的設定,讓錯誤發生時,執行另外一系列的 processors。(裡面的 processor 也可以再指定錯誤發生時的 on_failure,型成巢狀的設定)

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
  1. 直接在最外層的 pipeline 設定 on_failure,將整個 pipeline 最終會發生的錯誤,給抓住。(以下的例子配合 set processor,將這筆發生錯誤的資料,另外寫到指定的 index 中。)

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

在使用 on_failure 時,也可以使用以下的屬性,取得錯誤相關的資訊:

  • on_failure_message

  • on_failure_processor_type

  • on_failure_processor_tag

  • on_failure_pipeline

使用方式如下:

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

參考資料

Last updated