各種常用的 Processor

本篇學習重點

  • Ingest Pipeline 中常用的 Processor 及使用的方式

Ingest Pipeline 常用的 Processor

這邊會先介紹 Ingest Pipeline 當中,常用到的幾個 Processor,並且說明他們的能力與效果。

Grok

Grok 怎麼使用我就不多介紹了,如果不知道 Grok 要怎麼使用的,可以上網查詢,有非常多的資料。

Grok 在針對『非結構化的文字資料,整理成結構化的資料』會是非常常用的一個重要工具,這邊要特別介紹的是, Elasticsearch 在 Grok 的支援上,有許多內建好的 Patterns 可以直接拿來使用,至於有支援哪些,請直接從 GitHub - Elasticsearch Grok Patterns 查看,特別是裡面的 grok-patterns 檔案,記錄了一些通用型的 patterns。

至於 Grok processor 的使用方式如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
  "description" : "parse multiple patterns",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}"],
        "pattern_definitions" : {
          "FAVORITE_DOG" : "beagle",
          "FAVORITE_CAT" : "burmese"
        },
        "trace_match": true
      }
    }
  ]
},
"docs":[
  {
    "_source": {
      "message": "I love burmese cats!"
    }
  }
  ]
}
  • field:從哪個欄位中讀取資料。

  • patterns:Grok patterns 的描述字串,可以使用已經定義好的 patterns,也可以自己定義。(上例是將比對到的結果,存放到 pet 欄位中)

  • pattern_definitions:可以自行指定相同字串比對的規則,值的宣告中,也可以使用 | 來定義多個值。

  • trace_match:是否要回傳 _grok_match_index 這個 grok 執行結果的資訊。

回傳結果為:

{
  "docs": [
    {
      "doc": {
        "_type": "_doc",
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "I love burmese cats!",
          "pet": "burmese"
        },
        "_ingest": {
          "_grok_match_index": "1",
          "timestamp": "2016-11-08T19:43:03.850+0000"
        }
      }
    }
  ]

注意:為了避免某些 Grok 的處理花太久的時間、佔用太多系統資源,Elasticsearch 當中有定義 ingest.grok.watchdog.intervalingest.grok.watchdog.max_execution_time (預設值都是 1 秒),來檢查及限制 Grok 任務的處理。

Dissect

這個是與 Grok processor 類似的功能,但功能較單純一些,可以說在 Elasticsearch Ingest Pipeline 當中,更值得被優先選擇用來處理『非結構化的文字資料,整理成結構化的資料』的 processor,因為處理的方式較單純,不支援正規表示式 (Regular Expression),也因此執行速度在不少情境下比 Grok 快非常多。

注意:效能考量,基本上可以當作如果能使用 Dissect 做到的,就不要用 Grok。

Dissect processor 的使用方式

{
  "dissect": {
    "field": "message",
    "pattern" : "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"
   }
}

針對以下的文件內容

{
  "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171"
}

可以拆解並得到以下結構化的結果

"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "request": "/english/venues/cities/images/montpellier/18.gif",
    "auth": "-",
    "ident": "-",
    "verb": "GET",
    "@timestamp": "30/Apr/1998:22:00:52 +0000",
    "size": "3171",
    "clientip": "1.2.3.4",
    "httpversion": "1.0",
    "status": "200"
  }
}

使用 Dissert 時,主要是使用 %{keyname} 並在比對到值的時候,將抓到的值存放到對應 keyname 的欄位中,

另外可以配合使用以下這些主要的修飾符 (Modifier):

修飾符
使用方式
描述
文件

->

%{keyname1->}

右方的字元不論重覆多少次,都忽略它,常用在右方有很多空格時,要一口氣忽略,就可以使用。

+

%{+keyname} %{+keyname}

將多個比對到的欄位結果,合併在一起,可以透過 append_separator 指定這些值合併時的分隔符號,預設是空格。

+ with /n

%{+keyname/2} %{+keyname/1}

+ 一樣是合併多個結果,但多透過 /n 來指定這些值合併時的順序,n 從 1 開始。

?

%{?ignoreme}

忽略比對到的這個結果,其實效果和 %{} 是一樣的,不過指定名字會更利於 Dissert pattern 的閱讀。

* and &

%{*r1} %{&r1}

使用 * 將從比對到的結果當成欄位的名稱,並且欄位的值是 & 比對到的結果,例子中的 r1 代表的只是相同名字的 *& 是同一組。

Date

由於 Elastic Common Schema 的規範之中,每個 event 都會需要有 @timestamp 的欄位,並且實務中要將日期的資料使用日期的格式儲存在 Elasticsearch 中也是很常見的需求,因此要先介紹 Date processor。

{
  "description" : "...",
  "processors" : [
    {
      "date" : {
        "field" : "initial_date",
        "target_field" : "timestamp",
        "formats" : ["ISO8601"],
        "timezone" : "{{{my_timezone}}}",
        "locale" : "{{{my_locale}}}"
      }
    }
  ]
}

Date Processor 擁有幾個重要的設定:

  • field:從哪個欄位中讀取資料。

  • format:依照哪一種日期格式來解析資料,支援常見在 JSON 中使用的日期格式 ISO8601 或是 timestamp 的數字表示方式 UNIXUNIX_MS 或是使用 Java 的 date format 定義時間格式。

  • timezone:支援指定時區。

  • locale:支援日期格式中與地區語言相關的表示法,例如日星期、月份有些格式會用英文來表示。

Convert

如果要將某個欄位的型態,轉換成另種型態,就可以使用 Convert processor。

PUT _ingest/pipeline/my-pipeline-id
{
  "description": "converts the content of the id field to an integer",
  "processors" : [
    {
      "convert" : {
        "field" : "id",
        "type": "integer"
      }
    }
  ]
}
  • field:從哪個欄位中讀取資料。

  • type:轉換成哪個指定的型態。

上述的例子就是將原本可能是字串型態的欄位,指定轉換成為 integer 的型態。

Fingerprint

有時我們放入 Elasticsearch 的文件沒有能當作識別的唯一主鍵 (Primary Key),但是有多個欄位合併在一起就能代表是唯一的複合鍵 (Composite Key),當我們想避免資料重覆被 indexing 進入 Elasticsearch 時,又或是進入 Elasticsearch 之後,想要更有效的找出這些重覆的資料時,就是在事前加工,將這些欄位合併在一起產生出一個唯一的 Fingerprint (指紋),就能當成識別使用。

Finterprint processor 的使用方式:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "fingerprint": {
          "fields": ["user"],
          "target_field": "unique_key",
          "method": "SHA-1"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "user": {
          "last_name": "Smith",
          "first_name": "John",
          "date_of_birth": "1980-01-15",
          "is_active": true
        }
      }
    }
  ]
}
  • fields:主要在這裡提供多個值,甚至如上方的例子能直接給予一個物件。

  • target_field:產生的 Fingerprint 要存在哪個欄位中,預設是 fingerprint 的欄位。

  • method:使用哪種 Hash Function,有支援 MD5SHA-1SHA-256SHA-512MurmurHash3

  • salt:有需要的話甚至可以另外指定 salt 的值加在 Hash Function 的運算之中。

GeoIP

當我們有 IP 的資料,想要轉換成地理位置的資訊,在查詢時甚至能使用地圖的方式來呈現時,我們就可以使用 GeoIP processor:

{
  "description" : "Add geoip info",
  "processors" : [
    {
      "geoip" : {
        "field" : "ip"
      }
    }
  ]
}

當我們指定某個 IP 值,要透過 geoip processor 進行處理:

PUT my-index-00001/_doc/my_id?pipeline=geoip
{
  "ip": "8.8.8.8"
}

產生出來的結果就會是

{
  "found": true,
  "_index": "my-index-00001",
  "_type": "_doc",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 55,
  "_primary_term": 1,
  "_source": {
    "ip": "8.8.8.8",
    "geoip": {
      "continent_name": "North America",
      "country_name": "United States",
      "country_iso_code": "US",
      "location": { "lat": 37.751, "lon": -97.822 }
    }
  }
}

GeoIP processor 預設是使用 MaxMind 這間公司所提供免費版的資料庫,如果有要使用其他特定的資料庫來源,可以透過 database_file 來指定。

另外針對產生出來的欄位,可以透過 target_field 來指定名稱,甚至透過 properties 來決定產生出來的物件裡面要包含哪些 Geo 屬性。

KV

KV 是指 Key/Value,能將在字串中同樣規則的 Key/Value 值給解析出來。

例如最常使用在 URL 的 Query String 當中,如果我們想把 Query String 裡的值都結構化,可以使用:

{
  "kv": {
    "field": "query_string",
    "field_split": "&",
    "value_split": "="
  }
}

就能將 URL 的格式

{
  "query_string": "q=elasticsearch&locale=en"
}

轉變成

"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "query_string": "q=elasticsearch&locale=en",
    "q": "elasticsearch",
    "locale": "en"
  }
}

也能透過 target_field 將所以拆解出來的結果收集在某一個欄位之中。

Pipeline

如果我們定義了許多的 Pipeline,但希望能『模組化』的方式,將某些宣告重覆使用,就可以透過這個 Pipeline Processor 的方式,來組合其他定義好的 Processor。

例如我們先定義一個 pipelineA

PUT _ingest/pipeline/pipelineA
{
  "description" : "inner pipeline",
  "processors" : [
    {
      "set" : {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}

在另外的 pipelineB 有其他的處理要進行,但又希望使用到 pipelineA 所定義的部份,就可以像以下的方試來宣告:

PUT _ingest/pipeline/pipelineB
{
  "description" : "outer pipeline",
  "processors" : [
    {
      "pipeline" : {
        "name": "pipelineA"
      }
    },
    {
      "set" : {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}

其他的 Processors

其他還有許多好用的 Processors,例如:

  • drop:刪除某個欄位。

  • set:增加一個新欄位並填入指定的值。

  • urldecode:常常收集到的 log 裡面的 URL 是有 encode 過的,當裡面有非英文的語言、或是特殊符號時,在 Log 的解讀上會很辛苦,可以先透過 urldecode processor 進行 URL decode。

  • uri_parts:直接依照 URI 的標準,拆解出 schemedomainportpathqueryextensionfragment…等欄位。

  • user_agent:將 User Agent 的字串,拆解出 nameversionosdevice…等欄位。

  • script:透過 painless 的語言,編寫處理的邏輯,很強大的 processor。

  • fail:配合 if 的條件設定,可以在 ingest pipeline 時進行檢查,針對不符合要求的文件,拋出錯誤。

參考資料

Last updated