# 各種常用的 Processor

### 本篇學習重點

* Ingest Pipeline 中常用的 Processor 及使用的方式

## Ingest Pipeline 常用的 Processor

這邊會先介紹 Ingest Pipeline 當中，常用到的幾個 Processor，並且說明他們的能力與效果。

### Grok

Grok 怎麼使用我就不多介紹了，如果不知道 Grok 要怎麼使用的，可以上網查詢，有非常多的資料。

Grok 在針對『非結構化的文字資料，整理成結構化的資料』會是非常常用的一個重要工具，這邊要特別介紹的是， Elasticsearch 在 Grok 的支援上，有許多內建好的 Patterns 可以直接拿來使用，至於有支援哪些，請直接從 [GitHub - Elasticsearch Grok Patterns](https://github.com/elastic/elasticsearch/tree/7.15/libs/grok/src/main/resources/patterns) 查看，特別是裡面的 `grok-patterns` 檔案，記錄了一些通用型的 patterns。

至於 Grok processor 的使用方式如下：

```
POST _ingest/pipeline/_simulate
{
  "pipeline": {
  "description" : "parse multiple patterns",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}"],
        "pattern_definitions" : {
          "FAVORITE_DOG" : "beagle",
          "FAVORITE_CAT" : "burmese"
        },
        "trace_match": true
      }
    }
  ]
},
"docs":[
  {
    "_source": {
      "message": "I love burmese cats!"
    }
  }
  ]
}
```

* `field`：從哪個欄位中讀取資料。
* `patterns`：Grok patterns 的描述字串，可以使用已經定義好的 patterns，也可以自己定義。(上例是將比對到的結果，存放到 `pet` 欄位中)
* `pattern_definitions`：可以自行指定相同字串比對的規則，值的宣告中，也可以使用 `|` 來定義多個值。
* `trace_match`：是否要回傳 `_grok_match_index` 這個 grok 執行結果的資訊。

回傳結果為：

```
{
  "docs": [
    {
      "doc": {
        "_type": "_doc",
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "I love burmese cats!",
          "pet": "burmese"
        },
        "_ingest": {
          "_grok_match_index": "1",
          "timestamp": "2016-11-08T19:43:03.850+0000"
        }
      }
    }
  ]
```

> 注意：為了避免某些 Grok 的處理花太久的時間、佔用太多系統資源，Elasticsearch 當中有定義 `ingest.grok.watchdog.interval` 與 `ingest.grok.watchdog.max_execution_time` (預設值都是 1 秒)，來檢查及限制 Grok 任務的處理。

### Dissect

這個是與 Grok processor 類似的功能，但功能較單純一些，可以說在 Elasticsearch Ingest Pipeline 當中，更值得被優先選擇用來處理『非結構化的文字資料，整理成結構化的資料』的 processor，因為處理的方式較單純，不支援正規表示式 (Regular Expression)，也因此執行速度在不少情境下比 Grok 快非常多。

> 注意：效能考量，基本上可以當作如果能使用 Dissect 做到的，就不要用 Grok。

Dissect processor 的使用方式

```
{
  "dissect": {
    "field": "message",
    "pattern" : "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"
   }
}
```

針對以下的文件內容

```
{
  "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171"
}
```

可以拆解並得到以下結構化的結果

```
"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "request": "/english/venues/cities/images/montpellier/18.gif",
    "auth": "-",
    "ident": "-",
    "verb": "GET",
    "@timestamp": "30/Apr/1998:22:00:52 +0000",
    "size": "3171",
    "clientip": "1.2.3.4",
    "httpversion": "1.0",
    "status": "200"
  }
}
```

使用 Dissert 時，主要是使用 `%{keyname}` 並在比對到值的時候，將抓到的值存放到對應 `keyname` 的欄位中，

另外可以配合使用以下這些主要的修飾符 (Modifier)：

| 修飾符           | 使用方式                          | 描述                                                                             | 文件                                                                                                                                 |
| ------------- | ----------------------------- | ------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------- |
| `->`          | `%{keyname1->}`               | 右方的字元不論重覆多少次，都忽略它，常用在右方有很多空格時，要一口氣忽略，就可以使用。                                    | [link](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/dissect-processor.html#dissect-modifier-skip-right-padding)    |
| `+`           | `%{+keyname} %{+keyname}`     | 將多個比對到的欄位結果，合併在一起，可以透過 `append_separator` 指定這些值合併時的分隔符號，預設是空格。                 | [link](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/dissect-processor.html#dissect-modifier-append-key)            |
| `+` with `/n` | `%{+keyname/2} %{+keyname/1}` | 和 `+` 一樣是合併多個結果，但多透過 `/n` 來指定這些值合併時的順序，`n` 從 1 開始。                             | [link](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/dissect-processor.html#dissect-modifier-append-key-with-order) |
| `?`           | `%{?ignoreme}`                | 忽略比對到的這個結果，其實效果和 `%{}` 是一樣的，不過指定名字會更利於 Dissert pattern 的閱讀。                    | [link](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/dissect-processor.html#dissect-modifier-named-skip-key)        |
| `*` and `&`   | `%{*r1} %{&r1}`               | 使用 `*` 將從比對到的結果當成欄位的名稱，並且欄位的值是 `&` 比對到的結果，例子中的 `r1` 代表的只是相同名字的 `*` 與 `&` 是同一組。 | [link](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/dissect-processor.html#dissect-modifier-reference-keys)        |

### Date

由於 Elastic Common Schema 的規範之中，每個 event 都會需要有 `@timestamp` 的欄位，並且實務中要將日期的資料使用日期的格式儲存在 Elasticsearch 中也是很常見的需求，因此要先介紹 Date processor。

```
{
  "description" : "...",
  "processors" : [
    {
      "date" : {
        "field" : "initial_date",
        "target_field" : "timestamp",
        "formats" : ["ISO8601"],
        "timezone" : "{{{my_timezone}}}",
        "locale" : "{{{my_locale}}}"
      }
    }
  ]
}
```

Date Processor 擁有幾個重要的設定：

* `field`：從哪個欄位中讀取資料。
* `format`：依照哪一種日期格式來解析資料，支援常見在 JSON 中使用的日期格式 `ISO8601` 或是 timestamp 的數字表示方式 `UNIX`、`UNIX_MS` 或是使用 Java 的 date format 定義時間格式。
* `timezone`：支援指定時區。
* `locale`：支援日期格式中與地區語言相關的表示法，例如日星期、月份有些格式會用英文來表示。

### Convert

如果要將某個欄位的型態，轉換成另種型態，就可以使用 Convert processor。

```
PUT _ingest/pipeline/my-pipeline-id
{
  "description": "converts the content of the id field to an integer",
  "processors" : [
    {
      "convert" : {
        "field" : "id",
        "type": "integer"
      }
    }
  ]
}
```

* `field`：從哪個欄位中讀取資料。
* `type`：轉換成哪個指定的型態。

上述的例子就是將原本可能是字串型態的欄位，指定轉換成為 `integer` 的型態。

### Fingerprint

有時我們放入 Elasticsearch 的文件沒有能當作識別的唯一主鍵 (Primary Key)，但是有多個欄位合併在一起就能代表是唯一的複合鍵 (Composite Key)，當我們想避免資料重覆被 indexing 進入 Elasticsearch 時，又或是進入 Elasticsearch 之後，想要更有效的找出這些重覆的資料時，就是在事前加工，將這些欄位合併在一起產生出一個唯一的 Fingerprint (指紋)，就能當成識別使用。

Finterprint processor 的使用方式：

```
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "fingerprint": {
          "fields": ["user"],
          "target_field": "unique_key",
          "method": "SHA-1"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "user": {
          "last_name": "Smith",
          "first_name": "John",
          "date_of_birth": "1980-01-15",
          "is_active": true
        }
      }
    }
  ]
}
```

* `fields`：主要在這裡提供多個值，甚至如上方的例子能直接給予一個物件。
* `target_field`：產生的 Fingerprint 要存在哪個欄位中，預設是 `fingerprint` 的欄位。
* `method`：使用哪種 Hash Function，有支援 `MD5`、`SHA-1`、`SHA-256`、`SHA-512`、`MurmurHash3`。
* `salt`：有需要的話甚至可以另外指定 `salt` 的值加在 Hash Function 的運算之中。

### GeoIP

當我們有 IP 的資料，想要轉換成地理位置的資訊，在查詢時甚至能使用地圖的方式來呈現時，我們就可以使用 GeoIP processor：

```
{
  "description" : "Add geoip info",
  "processors" : [
    {
      "geoip" : {
        "field" : "ip"
      }
    }
  ]
}
```

當我們指定某個 IP 值，要透過 `geoip` processor 進行處理：

```
PUT my-index-00001/_doc/my_id?pipeline=geoip
{
  "ip": "8.8.8.8"
}
```

產生出來的結果就會是

```
{
  "found": true,
  "_index": "my-index-00001",
  "_type": "_doc",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 55,
  "_primary_term": 1,
  "_source": {
    "ip": "8.8.8.8",
    "geoip": {
      "continent_name": "North America",
      "country_name": "United States",
      "country_iso_code": "US",
      "location": { "lat": 37.751, "lon": -97.822 }
    }
  }
}
```

GeoIP processor 預設是使用 MaxMind 這間公司所提供免費版的資料庫，如果有要使用其他特定的資料庫來源，可以透過 `database_file` 來指定。

另外針對產生出來的欄位，可以透過 `target_field` 來指定名稱，甚至透過 `properties` 來決定產生出來的物件裡面要包含哪些 Geo 屬性。

### KV

KV 是指 Key/Value，能將在字串中同樣規則的 Key/Value 值給解析出來。

例如最常使用在 URL 的 Query String 當中，如果我們想把 Query String 裡的值都結構化，可以使用：

```
{
  "kv": {
    "field": "query_string",
    "field_split": "&",
    "value_split": "="
  }
}
```

就能將 URL 的格式

```
{
  "query_string": "q=elasticsearch&locale=en"
}
```

轉變成

```
"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "query_string": "q=elasticsearch&locale=en",
    "q": "elasticsearch",
    "locale": "en"
  }
}
```

也能透過 `target_field` 將所以拆解出來的結果收集在某一個欄位之中。

### Pipeline

如果我們定義了許多的 Pipeline，但希望能『模組化』的方式，將某些宣告重覆使用，就可以透過這個 Pipeline Processor 的方式，來組合其他定義好的 Processor。

例如我們先定義一個 `pipelineA`

```
PUT _ingest/pipeline/pipelineA
{
  "description" : "inner pipeline",
  "processors" : [
    {
      "set" : {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}
```

在另外的 `pipelineB` 有其他的處理要進行，但又希望使用到 `pipelineA` 所定義的部份，就可以像以下的方試來宣告：

```
PUT _ingest/pipeline/pipelineB
{
  "description" : "outer pipeline",
  "processors" : [
    {
      "pipeline" : {
        "name": "pipelineA"
      }
    },
    {
      "set" : {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}
```

### 其他的 Processors

其他還有許多好用的 Processors，例如：

* `drop`：刪除某個欄位。
* `set`：增加一個新欄位並填入指定的值。
* `urldecode`：常常收集到的 log 裡面的 URL 是有 encode 過的，當裡面有非英文的語言、或是特殊符號時，在 Log 的解讀上會很辛苦，可以先透過 `urldecode` processor 進行 URL decode。
* `uri_parts`：直接依照 URI 的標準，拆解出 `scheme`、`domain`、`port`、`path`、`query`、`extension`、`fragment`…等欄位。
* `user_agent`：將 User Agent 的字串，拆解出 `name`、`version`、`os`、`device`…等欄位。
* `script`：透過 painless 的語言，編寫處理的邏輯，很強大的 processor。
* `fail`：配合 `if` 的條件設定，可以在 ingest pipeline 時進行檢查，針對不符合要求的文件，拋出錯誤。

## 參考資料

1. [官方文件 - Elasticsearch Ingest Processor Reference](https://www.elastic.co/guide/en/elasticsearch/reference/7.15/processors.html)
