- 0. 要求
- 1. 安装
- 2. 使用
- 3. 插件助手
- 4. 参数
- 4.1. @type(必须的)
- 4.2. host(可选的)
- 4.3. port(可选的)
- 4.4. cloud_id
- 4.5. cloud_auth
- 4.6. emit_error_for_missing_id
- 4.7. hosts(可选的)
- 4.8. user,password(可选的)
- 4.9. path(可选的)
- 4.10. scheme(可选的)
- 4.11. ssl_verify
- 4.12. index_name(可选的)
- 4.13. logstash_format(可选的)
- 4.14. include_timestamp
- 4.15. logstash_prefix
- 4.16. logstash_prefix_separator
- 4.17. logstash_dateformat
- 4.18. pipeline
- 4.19. time_key_format
- 4.20. time_precision
- 4.21. time_key
- 4.22. time_key_exclude_timestamp
- 4.23. utc_index
- 4.24. suppress_type_name
- 4.25. target_index_key
- 4.26. target_type_key
- 4.27. target_index_affinity
- 4.28. template_name
- 4.29. template_file
- 4.30. templates
- 4.31. customize_template
- 4.32. rollover_index
- 4.33. index_date_pattern
- 4.34. deflector_alias
- 4.35. index_prefix
- 4.36. application_name
- 4.37. template_overwrite
- 4.38. max_retry_putting_template
- 4.39. fail_on_putting_template_retry_exceed
- 4.40. fail_on_detecting_es_version_retry_exceed
- 4.41. max_retry_get_es_version
- 4.42. request_timeout
- 4.43. reload_connections
- 4.44. reload_on_failure
- 4.45. resurrect_after
- 4.46. include_tag_key,tag_key
- 4.47. id_key
- 4.48. parent_key
- 4.49. routing_key
- 4.50. remove_keys
- 4.51. remove_keys_on_update
- 4.52. remove_keys_on_update_key
- 4.53. retry_tag
- 4.54. write_operation
- 4.55. time_parse_error_tag
- 4.56. reconnect_on_error
- 4.57. with_transporter_log
- 4.58. content_type
- 4.59. include_index_in_url
- 4.60. http_backend
- 4.61. http_backend_excon_nonblock
- 4.62. compression_level
- 4.63. prefer_oj_serializer
- 4.64. 客户端 / 主机证书选项
- 4.65. 代理支持
- 4.66. 缓冲选项
- 4.67. Hash flattening
- 4.68. 生成 Hash ID
- 4.69. 嗅探器类名
- 4.70. 选择器类名
- 4.71. 重新加载后
- 4.72. 验证客户端版本
- 4.73. 不可恢复的错误类型
- 4.74. verify_es_version_at_startup
- 4.75. default_elasticsearch_version
- 4.76. custom_headers
- 4.77. api_key
- 4.78. 没有看到你需要的配置?
- 4.79. 动态配置
- 4.80. 占位符
- 4.81. 多工作者
- 4.82. log_es_400_reason
- 4.83. suppress_doc_wrap
- 4.84. ignore_exceptions
- 4.85. exception_backup
- 4.86. bulk_message_request_threshold
- 4.87. enable_ilm
- 4.88. ilm_policy_id
- 4.89. ilm_policy
- 4.90. ilm_policies
- 4.91. ilm_policy_overwrite
- 4.92. truncate_caches_interval
- 4.93. use_legacy_template
- 4.94.
部分 - 4.94.1. include_chunk_id
- 4.94.2. chunk_id_key
- 4.95. @log_level(可选的)
- 5. 其他
- 6. 常用的输出 / 缓冲区参数
- 7. 故障排除
- 7.1. 无法向 Elasticsearch 发送事件
- 7.2. 无法看到详细的失败日志
- 7.3. 无法连接启用 TLS 的反向代理
- 7.4. 被拒绝的日志将永远重新提交,为什么?
- 7.5. 建议安装 typhoeus gem,为什么?
- 7.6. 停止在 k8s 上发送事件,为什么?
- 7.7. 随机发生 400 - Rejected by Elasticsearch,为什么?
- 7.8. Fluentd 似乎挂起了,不能连接 Elasticsearch,为什么?
- 7.9. 启用索引生命周期管理
- 7.10. 如何指定索引编解码器
- 7.11. 当 connect_write timeout reached 时,无法将日志推入Elasticsearch,为什么?
官方文档地址: elasticsearch
插件 GitHub 地址:fluent-plugin-elasticsearch
out_elasticsearch输出插件将记录写入 Elasticsearch。默认情况下,它使用 批量API 创建记录,可以实现在单个 API 调用中执行多个索引操作。这样可以减少开销,并极大地提高索引速度。这意味着当你第一次使用插件导入记录时,记录不会立即推送到 Elasticsearch。
当chunk_keys条件被满足时,记录将被发送到 Elasticsearch。如果要更改输出频率,请在chunk_keys中指定time,并在配置中指定timekey值。
0. 要求| fluent-plugin-elasticsearch | fluentd | ruby |
|---|---|---|
| >= 4.0.1 | >= v0.14.22 | >= 2.3 |
| >= 3.2.4 && < 4.0.1 | >= v0.14.22 | >= 2.1 |
| >= 2.0.0 && < 3.2.3 | >= v0.14.20 | >= 2.1 |
| < 2.0.0 | >= v0.12.0 | >= 1.9 |
注意:对于 v0.12 版本,您应该使用 1.x.y 的版本。如果遇到 1.x 版本的 bug,请发送补丁到 v0.12 分支。
注意:本文档是针对 fluent-plugin-elasticsearch 2.x 或更高版本的。对于 1.x 文档,请参阅 v0.12 分支。
注意:使用索引生命周期管理(ILM)特性需要安装 elasticsearch-xpack gem v7.4.0 或更高版本。
由于out_elasticsearch从 3.0.1 版开始就包含在td-agent的标准分发版中,所以td-agent用户不需要手动安装它。
如果你没有安装td-agent就安装了 Fluentd,请使用fluent-gem安装这个插件:
$ fluent-gem install fluent-plugin-elasticsearch2. 使用
在 Fluentd 配置中,使用@type elasticsearch。额外的配置是可选的:
@type elasticsearch host localhost port 9200 index_name fluentd type_name fluentd
注意:type_name参数将使用固定值_doc对于 Elasticsearch 7。
注意:type_name参数对 Elasticsearch 8 没有作用。
这个插件仅仅通过写入来创建 Elasticsearch 索引。考虑使用索引模板来控制索引的内容和方式。请参阅这个示例:
{
"mappings": {
"_default_": {
"_all": { "enabled": false },
"_source": { "compress": true },
"properties" : {
"event_data": { "type": "object", "store": "no" },
"@fields": { "type": "object", "dynamic": true, "path": "full" },
"@message": { "type": "string", "index": "analyzed" },
"@source": { "type": "string", "index": "not_analyzed" },
"@source_host": { "type": "string", "index": "not_analyzed" },
"@source_path": { "type": "string", "index": "not_analyzed" },
"@tags": { "type": "string", "index": "not_analyzed" },
"@timestamp": { "type": "date", "index": "not_analyzed" },
"@type": { "type": "string", "index": "not_analyzed" }
}
}
},
"settings": {
"index.cache.field.type" : "soft",
"index.refresh_interval": "5s",
"index.store.compress.stored": true,
"index.number_of_shards": "3",
"index.query.default_field": "querystring",
"index.routing.allocation.total_shards_per_node": "2"
},
"template": "logstash-*"
}
3. 插件助手
- event_emitter
- compat_parameters
这个选项必须始终是elasticsearch。
4.2. host(可选的)Elasticsearch 节点的主机名(默认为localhost)。
host user-custom-host.domain # 默认为 localhost
可以通过该参数指定 Elasticsearch 主机。
注意:从 v3.3.2 开始,host 参数支持内置占位符。如果你想在运行时使用elasticsearch_dynamic输出插件将事件动态发送到不同的主机,请考虑使用普通的elasticsearch输出插件。关于内置占位符的更多细节,请参阅占位符一节。
要使用主机上的 IPv6 地址参数,可以使用以下样式:
字符串样式
要使用字符串样式,必须使用引号引用 IPv6 地址,以防止被解释为 JSON:
host "[2404:7a80:d440:3000:192a:a292:bd7f:ca10]"
原始样式
也可以指定原始 IPv6 地址。这将被处理为[specified IPv6 address]:
host 2404:7a80:d440:3000:192a:a292:bd7f:ca104.3. port(可选的)
Elasticsearch 节点的端口号(默认值9200)。
port 9201 # 默认为 9200
通过该参数可以指定 Elasticsearch 端口。
4.4. cloud_idcloud_id test-dep:ZXVyb3BlLXdlc3QxLmdjcC5jbG91ZC5lcy5pbyRiYZTA1Ng==
可通过该参数指定 Elasticsearch cloud_id。
如果指定cloud_id选项,则需要cloud_auth选项。如果指定cloud_id选项,host、post、user和password选项将被忽略。
4.5. cloud_authcloud_auth 'elastic:slkjdaooewkd87iqQ2O8EQYV'
可以通过该参数指定 Elasticsearch cloud_auth。
4.6. emit_error_for_missing_idemit_error_for_missing_id true
当write_operation被配置为除index之外的任何值时,设置此值为true将导致插件emit_error_event的任何记录不包括_id字段。默认的(false)行为是静默地删除记录。
4.7. hosts(可选的)hosts host1:port1,host2:port2,host3:port3
可以指定多个 Elasticsearch 主机,分隔符为","。
如果你指定多个主机,这个插件将负载均衡的更新到 Elasticsearch。这是一个 elasticsearch-ruby 特性,默认策略是轮询。
如果指定hosts选项,host和port选项将被忽略。
host user-custom-host.domain # ignored port 9200 # ignored hosts host1:port1,host2:port2,host3:port3
如果指定hosts选项而不指定端口,则使用port选项。
port 9200 hosts host1:port1,host2:port2,host3 # port3 is 9200
注意:如果你将使用 https 方案,不要在你的主机ie中包括"https://"。主机"https://domain",这将导致 ES 集群不可达,你将收到一个错误"Can not reach Elasticsearch cluster"。
注意:在 v2.8.5 之前,允许在 URL 中嵌入用户名/密码。但是,从 v2.8.6 开始,该语法已被弃用,因为它会导致严重的连接问题。请将您的设置迁移到使用user和password字段。
IPv6地址
当需要指定 IPv6 地址时,必须同时指定模式:
hosts http://[2404:7a80:d440:3000:de:7311:6329:2e6c]:port1,http://[2404:7a80:d440:3000:de:7311:6329:1e6c]:port2,http://[2404:7a80:d440:3000:de:6311:6329:2e6c]:port3
如果你没有同时指定带有模式的主机,Elasticsearch 插件会投诉它们的 URI 无效。
4.8. user,password(可选的)连接到 Elasticsearch 节点的登录凭据(默认为nil):
user demo password secret
可以指定 HTTP 基本认证的用户和密码。
这个插件将在%{}占位符中转义所需的 URL 编码字符。
user %{demo+}
password %{@secret}
4.9. path(可选的)
Elasticsearch 的 REST API 端点用于发布写请求(默认为nil)。
path /elastic_search/4.10. scheme(可选的)
如果 Elasticsearch 端点支持 SSL(默认为http),请指定https。
scheme https4.11. ssl_verify
指定ssl_verify false以跳过ssl验证(默认为true)。
4.12. index_name(可选的)要写入事件的索引名称(默认值:fluentd)。
此选项支持 Fluentd 插件 API 的占位符语法。例如,如果你想按标签划分索引,可以这样指定:
index_name fluentd.${tag}
下面是一个更实际的例子,它根据标签和时间戳对 Elasticsearch 索引进行了分区:
index_name fluentd.${tag}.%Y%m%d
时间占位符需要在chunk_keys中设置标签和时间。同样,它需要为 chunk 的时间片指定timekey:
timekey 1h # chunks per hours ("3600" also available)
有关缓冲区选项的更多信息,请查看缓冲区部分配置。
4.13. logstash_format(可选的)logstash_format true # 默认为 false
这意味着将数据写入 Elasticsearch 索引与 Logstash 所调用的索引兼容。通过这样做,你可以利用 Kibana。请参阅logstash_prefix和logstash_dateformat来定制此索引名称模式。索引名称将是#{logstash_prefix}-#{formatted_date}
将该选项设置为true将忽略index_name设置。默认索引名称前缀为logstash-。
4.14. include_timestampinclude_timestamp true # 默认为 false
将@timestamp字段添加到日志中,遵循logstash_format所做的所有设置,但不限制index_name。这允许用户在 ES 中使用别名记录日志,并利用滚动 API。
4.15. logstash_prefix当logstash_format为 true 时,用于写入事件的 logstash 前缀索引名称(默认为 logstash)。
logstash_prefix mylogs # 默认为 "logstash"4.16. logstash_prefix_separator
logstash_prefix_separator _ # 默认为 "-"4.17. logstash_dateformat
当logstash_format设置为true时,生成目标索引名称的索引时间格式。缺省情况下,记录被插入到索引logstash-YYYY.MM.DD中。这个选项以及logstash_prefix允许我们插入到指定的索引中,例如每月索引的mylogs-YYYYMM。
logstash_dateformat %Y.%m. # 默认为 "%Y.%m.%d"4.18. pipeline
仅在 ES>= 5.x 中可以使用该参数。这个参数是用来设置要添加到请求中的 elasticsearch 的管道 id,你可以配置摄取节点。
pipeline pipeline_id4.19. time_key_format
时间戳字段的格式(@timestamp或time_key指定的时间戳)。只有当logstash_format为true时,该参数才有效果,因为它只影响我们写入的索引的名称。有关该格式的值的信息,请参阅Time#strftime。
如果您的大部分日志都使用相同的格式,那么将其设置为已知格式可以极大地提高日志的接收速度。如果解析此格式时出现错误,则时间戳将默认为输入时间。如果您使用的是Ruby 2.0或更高版本,您可以通过安装"strptime" gem来进一步提高性能:fluent-gem install strptime。
例如,以秒级精度解析ISO8601时间:
time_key_format %Y-%m-%dT%H:%M:%S.%N%z4.20. time_precision
如果记录不包含time_key,则定义秒以下的时间精度,以保留路由事件的time部分。
例如,如果你的输入插件没有在记录中包含一个time_key,但它能够在发出事件时传递time给路由器(AWS CloudWatch 事件就是一个例子),然后,这个设置将允许您保留这些事件的亚秒级时间分辨率。这就是 fluent-plugin-cloudwatch-ingest 。
4.21. time_key默认情况下,当以 Logstash 格式插入记录时,@timestamp将动态创建日志输入时的时间。如果您想使用自定义时间,请在记录中包含@timestamp。
{"@timestamp": "2014-04-07T000:00:00-00:00"}
你可以指定一个选项time_key(就像 tail Input Plugin 中描述的选项)来替换@timestampkey。
假设你有设置:
logstash_format true time_key vtm
你的输入是:
{
"title": "developer",
"vtm": "2014-12-19T08:01:03Z"
}
输出将是:
{
"title": "developer",
"@timestamp": "2014-12-19T08:01:03Z",
"vtm": "2014-12-19T08:01:03Z"
}
请参阅time_key_exclude_timestamp以避免添加@timestamp。
4.22. time_key_exclude_timestamptime_key_exclude_timestamp false
默认情况下,设置time_key会将值复制到一个额外的字段@timestamp。当设置time_key_exclude_timestamp为true时,不添加额外字段。
4.23. utc_indexutc_index true
默认情况下,使用 UTC(协调世界时)将记录插入索引logstash-YYMMDD。如果将utc_index设置为false,该选项允许使用本地时间。
4.24. suppress_type_name在 Elasticsearch 7.x,Elasticsearch 集群提示以下类型的删除警告:
{"type": "deprecation", "timestamp": "2020-07-03T08:02:20,830Z", "level": "WARN", "component": "o.e.d.a.b.BulkRequestParser", "cluster.name": "docker-cluster", "node.name": "70dd5c6b94c3", "message": "[types removal] Specifying types in bulk requests is deprecated.", "cluster.uuid": "NoJJmtzfTtSzSMv0peG8Wg", "node.id": "VQ-PteHmTVam2Pnbg7xWHw" }
可以用以下方法来抑制:
suppress_type_name true4.25. target_index_key
告诉这个插件在这个键下查找记录中要写入的索引名,而不是其他机制。键可以使用点('.')作为分隔符指定为嵌套记录的路径。
如果它存在于记录中(且值是非假值),则该值将被用作写入记录的索引名,然后在输出之前从记录中删除;如果没有找到,那么它将按照配置使用logstash_format或index_name来设置索引。
假设您有以下设置:
target_index_key @target_index index_name fallback
如果你的输入是:
{
"title": "developer",
"@timestamp": "2014-12-19T08:01:03Z",
"@target_index": "logstash-2014.12.19"
}
输出将是:
{
"title": "developer",
"@timestamp": "2014-12-19T08:01:03Z",
}
并且该记录将被写入指定的索引logstash-2014.12.19而不是fallback。
4.26. target_type_key与target_index_key配置类似,在该键(或嵌套记录)下的记录中找到要写入的类型名称。如果在记录中没有找到 key - 回退到type_name(默认为"fluentd")
4.27. target_index_affinity允许插件动态选择基于 logstash 时间的目标索引 update/upsert 操作基于已经索引的数据,而不是当前索引的时间。
target_index_affinity true # 默认为 false
默认情况下,插件根据当前时间写入 logstash 格式索引的数据。例如,在可能的数据被写入新创建的索引后,基于每日的索引。当数据来自单一数据源且索引后没有更新时,这通常是可以的。
但是,如果在用例中,数据在索引之后也要更新,并且使用id_key唯一地标识用于更新的文档,Logstash 格式用于方便的数据管理和保留。在索引完成数据之后立即进行更新(所有的数据都不能从单一数据源获得),并且在以后的时间点上不再进行更新。在这种情况下,问题发生在索引旋转时,写入两个具有相同id_key值的索引可能会发生。
这个设置将通过使用id_key值的弹性搜索的 id 查询来搜索现有数据(使用logstash_prefix和logstash_prefix_separator索引模式,例如logstash-*)。找到的数据的索引用于update/upsert。当没有找到数据时,数据会像往常一样写入当前 logstash 索引。
该设置需要进行以下其他设置:
logstash_format true id_key myId # 数据上的某些字段,以惟一地标识数据 write_operation upsert # upsert 或 update
假设你有以下情况,你有两个不同的 match,消费数据从 2 个不同的 Kafka topic 独立,但在时间上彼此关闭(顺序未知)。
@type elasticsearch ... id_key myId write_operation upsert logstash_format true logstash_dateformat %Y.%m.%d logstash_prefix myindexprefix target_index_affinity true ... @type elasticsearch ... id_key myId write_operation upsert logstash_format true logstash_dateformat %Y.%m.%d logstash_prefix myindexprefix target_index_affinity true ...
如果您的第一个(data1)输入是:
{
"myId": "myuniqueId1",
"datafield1": "some value",
}
第二个(data2)输入是:
{
"myId": "myuniqueId1",
"datafield99": "some important data from other source tightly related to id myuniqueId1 and wanted to be in same document.",
}
今天的日期是10.05.2021,所以当 data1 和 data2 在今天消耗时,数据会写入索引myindexprefix-2021.05.10。但是当我们接近索引旋转时,data1 被消耗并在2021-05-10T23:59:55.59707672Z被索引,data2 被稍晚一点在2021-05-11T00:00:58.222079Z被消耗,即,logstash 索引已被旋转,通常 data2 将被写入索引myindexprefix-2021.05.11。但是将target_index_affinity设置为true后,data2 现在会被写入索引myindexprefix-2021.05.10到与 data1 相同的文档中,从而避免重复文档。
4.28. template_name要定义的模板的名称。如果给定名称的模板已经存在,它将保持不变,除非设置了template_overwrite,在这种情况下模板将被更新。
这个参数连同template_file允许插件的行为类似于 Logstash(它在创建时安装一个模板),因此原始记录是可用的。见 https://github.com/uken/fluent-plugin-elasticsearch/issues/33。
template_file也必须指定。
4.29. template_file包含要安装的模板文件的路径。
template_name也必须指定。
4.30. templates指定散列形式的索引模板。可以包含多个模板。
templates { "template_name_1": "path_to_template_1_file", "template_name_2": "path_to_template_2_file"}
注意:在 ES plugin v4.1.2 之前,如果设置了template_file和template_name,则该参数将被忽略。在 4.1.3 或更高版本中,template_file和template_name可以和templates一起工作。
4.31. customize_template指定要以散列形式替换的字符串及其值。可以包含多个键值对,它们将在指定的template_file中被替换。此设置仅创建模板,若要添加翻转索引,请检查rollover_index配置。
customize_template {"string_1": "subs_value_1", "string_2": "subs_value_2"}
如果设置了template_file和template_name,则该参数生效,否则忽略。
4.32. rollover_index当需要创建具有翻转功能的索引时,指定为true。它创建的索引格式为
rollover_index true # defaults to false
如果设置了customize_template,则该参数将生效,否则将被忽略。
4.33. index_date_pattern指定此参数可覆盖用于创建翻转索引的索引日期模式。默认是使用"now/d",例如:
该设置只有在与enable_ilm设置结合使用时才生效。
index_date_pattern "now/w{xxxx.ww}" # 默认为 "now/d"
如果在index_date_pattern中指定空字符串(""),则不使用索引日期模式。Elasticsearch 插件只创建
如果设置了customize_template,则该参数将生效,否则将被忽略。
4.34. deflector_alias指定将分配给创建的翻转索引的偏转器别名。这在使用 Elasticsearch 翻转 API 时非常有用。
deflector_alias test-current
如果设置了rollover_index,则该参数将被忽略。
注意:从 4.1.1 开始,defector_alias被禁止与enable_ilm一起使用。
4.35. index_prefix此参数被标记为过时。当不与logstash_format一起使用时,考虑使用index_name来指定 ILM 目标索引。当将logstash_format指定为true时,考虑使用logstash_prefix指定 ILM 目标索引前缀。
4.36. application_name指定要创建的翻转索引的应用程序名称。
application_name default # 默认为 "default"
如果设置了enable_ilm,则该参数将生效,否则将被忽略。
4.37. template_overwrite总是更新模板,即使它已经存在。
template_overwrite true # defaults to false
如果设置了该参数,还必须指定其中一个template_file或templates。
4.38. max_retry_putting_template您可以指定重试放置模板的次数。
当 Elasticsearch 插件无法连接 Elasticsearch 来放置模板时,这是有用的。通常,启动集群的 Elasticsearch 容器要比启动 Fluentd 容器慢得多。
max_retry_putting_template 15 # 默认为 104.39. fail_on_putting_template_retry_exceed
指示当max_retry_putting_template被超过时是否失败。如果你有多个输出插件,你可以设置这个属性使 fluentd 在启动后不会失败。
fail_on_putting_template_retry_exceed false # 默认为 true4.40. fail_on_detecting_es_version_retry_exceed
指示当max_retry_get_es_version超出时是否失败。如果您想使用回退机制来获取 ELasticsearch 版本,你可以设置这个属性使 fluentd 在启动后不会失败。
fail_on_detecting_es_version_retry_exceed false
应该使用以下参数:
verify_es_version_at_startup true max_retry_get_es_version 2 # 大于 0。 default_elasticsearch_version 7 # 此版本在发生回退时使用。4.41. max_retry_get_es_version
可以指定重新获取 Elasticsearch 版本的次数。当 Elasticsearch 插件无法连接 Elasticsearch 获取 Elasticsearch 版本时,这是有用的。通常,启动集群的 Elasticsearch 容器要比启动 Fluentd 容器慢得多。
max_retry_get_es_version 17 # defaults to 154.42. request_timeout
可以指定 HTTP 请求超时时间。
当 Elasticsearch 在默认的5 秒内无法对批量请求返回响应时,这非常有用。
request_timeout 15s # defaults to 5s4.43. reload_connections
您可以调整elasticsearch-transport主机重载特性的工作方式。默认情况下,它将每隔 10,000 次请求从服务器重新加载主机列表,以分散负载。如果您的 Elasticsearch 集群位于反向代理之后,这可能是个问题,因为 Fluentd 进程可能无法直接访问 Elasticsearch 节点。
reload_connections false # 默认为 true4.44. reload_on_failure
指示elasticsearch-transport将尝试在发出请求时出现故障时重新加载节点地址,这对于快速从地址列表中删除死节点非常有用。
reload_on_failure true # 默认为 false4.45. resurrect_after
您可以在elasticsearch-transport中设置来自elasticsearch-transport池的死连接恢复的频率。
resurrect_after 5s # 默认为 60s4.46. include_tag_key,tag_key
include_tag_key true # 默认为 false tag_key tag # 默认为 tag
这将在 JSON 记录中添加 Fluentd 标记。例如,如果你有这样的配置:
@type elasticsearch include_tag_key true tag_key _key
插入到 Elasticsearch 中的记录将是:
{"_key": "my.logs", "name": "Johnny Doeie"}
4.47. id_key
id_key request_id # 在 ES 中使用“request_id”字段作为记录 id
默认情况下,所有插入到 Elasticsearch 中的记录都会得到一个随机的 _id。此选项允许使用记录中的字段作为标识符。
以下记录{"name": "Johnny", "request_id": "87d89af7daffad6"}将触发以下 Elasticsearch 命令:
{ "index" : { "_index": "logstash-2013.01.01", "_type": "fluentd", "_id": "87d89af7daffad6" } }
{ "name": "Johnny", "request_id": "87d89af7daffad6" }
Fluentd 使用新的且唯一的_id值重新触发在 Elasticsearch 中索引/摄取失败的事件,这意味着拥塞的 Elasticsearch 集群拒绝事件(例如由于命令队列溢出)将导致 Fluentd 重新发出带有新_id的事件,但是Elasticsearch 实际上可能同时处理(或更多)尝试(有一些延迟),并在索引中创建重复的事件(因为每个事件都有一个唯一的_id值),一个可能的解决方案是使用fluent-plugin-genhashvalue插件在每个事件的记录中生成一个唯一的_hash键,这个_hash记录可以作为id_key来防止 Elasticsearch 创建重复的事件。
id_key _hash
fluent-plugin-genhashvalue的示例配置(查看插件的文档以了解更多细节)。
@type genhashvalue keys session_id,request_id hash_type md5 # md5/sha1/sha256/sha512 base64_enc true base91_enc false set_key _hash separator _ inc_time_as_key true inc_tag_as_key true
为了避免散列冲突和数据丢失,在选择事件记录中用于计算散列的键时需要仔细考虑。
使用嵌套的 key
还支持嵌套键指定语法。使用以下配置:
id_key $.nested.request_id
和下面的嵌套记录:
{"nested":{"name": "Johnny", "request_id": "87d89af7daffad6"}}
将触发以下 Elasticsearch 命令:
{"index":{"_index":"fluentd","_type":"fluentd","_id":"87d89af7daffad6"}}
{"nested":{"name":"Johnny","request_id":"87d89af7daffad6"}}
请注意哈希平滑可能是冲突嵌套记录特性。
4.48. parent_keyparent_key a_parent # 在elasticsearch命令中使用“a_parent”字段值设置_parent
如果你的输入是:
{ "name": "Johnny", "a_parent": "my_parent" }
Elasticsearch 命令将是:
{ "index" : { "_index": "****", "_type": "****", "_id": "****", "_parent": "my_parent" } }
{ "name": "Johnny", "a_parent": "my_parent" }
如果没有配置parent_key或者输入记录中没有parent_key,则不会发生任何事情。
使用嵌套的 key
还支持嵌套键指定语法。使用以下配置:
parent_key $.nested.a_parent
和下面的嵌套记录:
{"nested":{ "name": "Johnny", "a_parent": "my_parent" }}
将触发以下 Elasticsearch 命令:
{"index":{"_index":"fluentd","_type":"fluentd","_parent":"my_parent"}}
{"nested":{"name":"Johnny","a_parent":"my_parent"}}
请注意哈希平滑可能是冲突嵌套记录特性。
4.49. routing_key类似于parent_key配置,如果设置了routing_key并且输入事件中确实存在字段,将在 elasticsearch 命令中添加_routing。
4.50. remove_keysparent_key a_parent routing_key a_routing remove_keys a_parent, a_routing # a_parent和a_routing字段不会被发送到elasticsearch4.51. remove_keys_on_update
当记录被更新时,在更新时删除键不会更新 elasticsearch 中配置的键。只有当写操作是更新或 upsert 时,此设置才有效。
如果写设置是 upsert,那么这些键只在记录被更新时被删除,如果记录不存在(根据 id),那么所有键都被索引。
remove_keys_on_update foo,bar4.52. remove_keys_on_update_key
这个设置允许在每个记录中为remove_keys_on_update配置一个键,其工作方式与target_index_key非常相似。在 elasticsearch 中建立索引之前,已配置的键被删除。如果记录中同时存在remove_keys_on_update和remove_keys_on_update_key,那么记录中的键将被使用,如果remove_keys_on_update_key不存在,那么remove_keys_on_update的值将被用作回退。
remove_keys_on_update_key keys_to_skip4.53. retry_tag
此设置允许自定义消息路由,以响应批量请求失败。默认行为是使用提供的相同标记发出失败记录。当设置为非nil值时,失败的消息会用指定的标签发出:
retry_tag 'retry_es'
注意:retry_tag是可选的。如果你更愿意使用 labels 来重路由重试,可以在你的 fluent elasticsearch 插件配置中添加一个 label(例如'@label @SOMELABEL')。默认情况下,重试记录将提交到 ROOT 标签以供重试,这意味着记录将从头开始通过流管道。如果管道是幂等的,这可能也不是问题——也就是说,您可以不做任何更改地再次处理一条记录。使用 tag 或 label 以确保您的重试记录不会被 fluentd 处理管道再次处理。
4.54. write_operationwrite_operation 的值可以是下表中的任意一个:
| 操作 | 描述 |
|---|---|
| index(默认的) | 新数据被添加,而现有数据(基于其 id)被替换(变换索引)。 |
| create | 添加新数据 - 如果数据已经存在(基于其 id),则跳过该操作。 |
| update | 更新现有数据(基于其 id)。如果没有找到数据,则跳过该操作。 |
| upsert | 如果数据不存在则称为合并或插入,如果数据存在则更新(基于其 id)。 |
请注意,id 在创建、更新和 upsert 场景中是必需的。如果没有 id,消息将被删除。
4.55. time_parse_error_taglogstash_format为true时,elasticsearch 插件解析时间戳字段以生成索引名。如果记录有无效的时间戳值,这个插件将发送一个带有time_parse_error_tag配置的标签的错误事件到@ERROR标签。
默认值为Fluent::ElasticsearchOutput::TimeParser.error表示向后兼容性。::分隔标签不适合 tag 路由,因为一些插件假设 tag 被.分隔。我们建议像time_parse_error_tag es_plugin.output.time.error那样设置这个参数。我们将更改默认值为.分隔的 tag。
4.56. reconnect_on_error指示插件应该在任何错误时重置连接(在下次发送时重新连接)。默认情况下,它只在host unreachable exceptions时重新连接。我们建议在存在 elasticsearch 屏蔽时将此设置为true。
reconnect_on_error true # defaults to false4.57. with_transporter_log
这是用于获取传输层日志的调试选项。为向后兼容,默认值为false。
如果你开始调试这个插件,我们建议设置为true。
with_transporter_log true4.58. content_type
使用content_type application/x-ndjson,elasticsearch 插件在有效负载中添加application/x-ndjson作为Content-Type。
Elasticsearch 请求的默认Content-Type是application/json。如果你不使用模板,它建议设置content_type application/x-ndjson。
content_type application/x-ndjson4.59. include_index_in_url
如果将此选项设置为true,Fluentd 将在请求 URL 中(而不是在请求主体中)显示索引名称。可以使用此选项强制基于 URL 的访问控制。
include_index_in_url true4.60. http_backend
使用http_backend typhoeus,elasticsearch 插件使用 typhoeus faraday http backend。Typhoeus 可以处理 HTTP 保持存活。
默认值是excon,它是 elasticsearch 插件的默认 http_backend。
http_backend typhoeus4.61. http_backend_excon_nonblock
使用http_backend_excon_nonblock false时,elasticsearch 插件使用非block=false的excon。如果你在 jRuby 中使用 elasticsearch 插件用于 https,你可能需要考虑设置false以避免以下问题。
- https://github.com/geemus/excon/issues/106
- https://github.com/jruby/jruby-ossl/issues/19
但对于所有其他情况,强烈建议设置为true,以避免在 https://github.com/uken/fluent-plugin-elasticsearch/issues/732 中报告的进程挂起问题。
默认值为true:
http_backend_excon_nonblock false4.62. compression_level
您可以添加 gzip 压缩输出数据。在这种情况下,应该选择default_compression、best_compression或best speed选项。默认情况下没有压缩,该选项的默认值是no_compression。
compression_level best_compression4.63. prefer_oj_serializer
在默认行为下,Elasticsearch 客户端使用Yajl作为 JSON 编码器/解码器。Oj是替代高性能 JSON 编码器/解码器。当此参数设置为true时,Elasticsearch 客户端使用Oj作为 JSON 编码器/解码器。
默认值是false。
prefer_oj_serializer true4.64. 客户端 / 主机证书选项
需要验证 Elasticsearch 的证书吗?可以使用以下参数指定 CA,而不是使用环境变量。
ca_file /path/to/your/ca/cert
您的 Elasticsearch 集群想要验证客户端连接吗?您可以指定以下参数来为您的连接使用您的客户端证书、密钥和密钥密码。
client_cert /path/to/your/client/cert client_key /path/to/your/private/key client_key_pass password
当需要配置 SSL/TLS 版本时,可指定ssl_version参数。
ssl_version TLSv1_2 # or [SSLv23, TLSv1, TLSv1_1]
如果启用 SSL/TLS,可能需要设置ssl_version。
在 Elasticsearch 插件 v4.0.2 与 Ruby 2.5 或更高版本的组合中,Elasticsearch 插件还支持ssl_max_version和ssl_min_version。
ssl_max_version TLSv1_3 ssl_min_version TLSv1_2
Elasticsearch 插件在 TLS 传输时使用 TLSv1.2 作为最小 ssl 版本,TLSv1.3 作为最大 ssl 版本。注意,当它们在 Elastissearch 插件配置中使用时,ssl_version不是用来设置 TLS 版本的。
如果在 Elasticsearch 插件配置中没有指定,则ssl_max_version和ssl_min_version设置为:
在 Elasticsearch 插件 v4.0.8 或更高版本的 Ruby 2.5 或更高版本的环境中,ssl_max_version应该是TLSv1_3,ssl_min_version应该是TLSv1_2。
在 Ruby 2.5 或更高版本的环境中,从 Elasticsearch 插件 v4.0.4 到 v4.0.7,ssl_version的值将用于ssl_max_version和ssl_min_version。
4.65. 代理支持从 0.8.0 版本开始,这个 gem 使用 excon,它支持带有环境变量的代理 - https://github.com/excon/excon#proxy-support。
4.66. 缓冲选项fluentd -plugin-elasticsearch扩展了 Fluentd 的内置 Output 插件,并使用了compat_parameters插件助手。它增加了以下选项:
buffer_type memory flush_interval 60s retry_limit 17 retry_wait 1.0 num_threads 1
选项buffer_chunk_limit的值不应该超过http.max_content_length的值在你的 Elasticsearch 设置(默认是 100mb)。
注意:如果您使用或评估 Fluentd v0.14,您也可以使用
注意:如果你在 v0.12 中使用disable_retry_limit或者在 v0.14 或更高版本中使用retry_forever,请小心使用内存。
4.67. Hash flattening如果将对象和具体值发送到同一个字段,Elasticsearch 会报错。例如,你可能有这样的日志,从不同的地方:
{"people" => 100} {"people" => {"some" => "thing"}}
第二行日志将被 Elasticsearch 解析器拒绝,因为对象和具体值不能位于同一个字段中。为了解决这个问题,可以启用散列平滑 hash flattening。
flatten_hashes true flatten_hashes_separator _
这将生成如下所示的 elasticsearch 输出:
{"people_some" => "thing"}
请注意,flattener此时不处理数组。
4.68. 生成 Hash ID默认情况下,fluentd elasticsearch 插件不发出带有_id字段的记录,让elasticsearch 在记录被索引时生成唯一的_id。当 Elasticsearch 集群拥塞,响应时间开始超过配置的request_timeout时,fluentd elasticsearch 插件将重新发送相同的批量请求。因为 Elasticsearch 不能告诉它实际上是同一个请求,所以请求中的所有文档都会再次被索引,导致数据重复。在某些场景中,这可能导致无限循环生成相同数据的多个副本。
绑定的elasticsearch_genid过滤器可以为每条记录生成一个唯一的_hash键,这个键可以传递给 elasticsearch 插件中的id_key参数,以向 elasticsearch 传递请求的唯一性,这样重复的请求将被拒绝或简单地替换现有的记录。下面是一个配置示例:
4.69. 嗅探器类名@type elasticsearch_genid hash_id_key _hash # storing generated hash id key (default is _hash) @type elasticsearch id_key _hash # specify same key name which is specified in hash_id_key remove_keys _hash # Elasticsearch doesn't like keys that start with _ # other settings are omitted.
当 Fluentd 与所有 Elasticsearch 服务器有直接连接时,Elasticsearch::Transport类使用的默认嗅探器可以很好地工作,并且可以有效地使用_nodes API。当 Fluentd 必须通过负载均衡器或代理进行连接时,这种方法不能很好地工作。参数sniffer_class_name使您能够提供自己的 Sniffer 类来实现所需的任何连接重新加载逻辑。此外,还有一个新的Fluent::Plugin::ElasticsearchSimpleSniffer类,它重用配置中给出的主机,通常是负载均衡器或代理的主机名。例如,这样的配置会导致每 100 个操作重新加载到logging-es的连接:
host logging-es port 9200 reload_connections true sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer reload_after 100
小贴士
包含的嗅探器类不是必需的out_elasticsearch。您应该告诉 Fluentd 嗅探器类存在于何处。
如果使用td-agent,必须在TD_AGENT_DEFAULT文件中加入以下几行:
sniffer=$(td-agent-gem contents fluent-plugin-elasticsearch|grep elasticsearch_simple_sniffer.rb) TD_AGENT_OPTIONS="--use-v1-config -r $sniffer"
如果直接使用 Fluentd,则必须将以下行作为 Fluentd 命令行选项传递:
sniffer=$(td-agent-gem contents fluent-plugin-elasticsearch|grep elasticsearch_simple_sniffer.rb) $ fluentd -r $sniffer [AND YOUR OTHER OPTIONS]4.70. 选择器类名
当 Fluentd 应该表现为轮循和随机选择器情况时,Elasticsearch::Transport类使用的默认选择器工作得很好。当 Fluentd 从耗尽的 ES 集群回退到正常的 ES 集群时,这种方法不能很好地工作。参数selector_class_name使您能够提供自己的 Selector 类来实现所需的任何选择节点逻辑。
下面的配置是使用插件内置的ElasticseatchFallbackSelector:
hosts exhausted-host:9201,normal-host:9200 selector_class_name "Fluent::Plugin::ElasticseatchFallbackSelector"
小贴士
默认情况下,out_elasticsearch中需要包含的选择器类。但是,在out_elasticsearch中不需要您的自定义选择器类。您应该告诉 Fluentd 选择器类在哪里。
如果使用td-agent,必须在TD_AGENT_DEFAULT文件中加入以下几行:
selector=/path/to/your_awesome_selector.rb TD_AGENT_OPTIONS="--use-v1-config -r $selector"
如果直接使用 Fluentd,则必须将以下行作为 Fluentd 命令行选项传递:
selector=/path/to/your_awesome_selector.rb $ fluentd -r $selector [AND YOUR OTHER OPTIONS]4.71. 重新加载后
当reload_connections为true时,这是整型的操作数,之后插件将重新加载连接。缺省值是10000。
4.72. 验证客户端版本当您使用不匹配的 Elasticsearch 服务器和客户端库时,fluent-plugin-elasticsearch 无法将数据发送到 Elasticsearch。默认值为false。
validate_client_version true4.73. 不可恢复的错误类型
默认的unrecoverable_error_type参数是严格设置的。因为es_rejected_execution_exception是由超过 Elasticsearch 的线程池容量引起的。高级用户可以增加它的容量,但普通用户应该遵循默认行为。
如果您想增加它并强制重试批量请求,请考虑更改unrecoverable_error_type参数的默认值。
在elasticsearch.yml修改thread_pool.bulk.queue_size的默认值。例如:
thread_pool.bulk.queue_size: 1000
然后,从unrecoverable_error_types参数中删除es_rejected_execution_exception:
unrecoverable_error_types ["out_of_memory_error"]4.74. verify_es_version_at_startup
因为 Elasticsearch 插件应该改变每个主要版本的行为。
例如,Elasticsearch 6 开始禁止在一个索引中使用多个 type_name,而Elasticsearch 7 将只处理 index 中名字为_doc的 type_name。
如果你想禁用在启动时验证 Elasticsearch 版本,设置为false。
当使用以下配置时,ES 插件打算与 Elasticsearch 6 通信。
verify_es_version_at_startup false # 默认值为 true。 default_elasticsearch_version 64.75. default_elasticsearch_version
个参数改变了 ES 插件默认的 Elasticsearch 版本。缺省值是 5。
4.76. custom_headers此参数为请求添加额外的头。默认值为{}。
custom_headers {"token":"secret"}
4.77. api_key
该参数增加了认证头。默认值为nil。
api_key "ElasticsearchAPIKEY"4.78. 没有看到你需要的配置?
我们尽量保持这个插件的范围小,不添加太多配置选项。如果您认为某个选项对其他人有用,请随意打开一个问题或贡献一个 Pull Request。
或者,考虑使用fluent-plugin-forest。例如,配置多个标签发送到不同的 Elasticsearch 索引:
@type forest subtype elasticsearch remove_prefix my.logs logstash_prefix ${tag} # ...
另一个选项在动态配置一节中描述。
注意:如果使用或计算 Fluentd v0.14,可以使用内置占位符。更多细节,请参阅占位符部分。
4.79. 动态配置注意:out_elasticsearch_dynamic将被计划标记为deprecated。请不要使用新的 Fluentd 配置。这个插件是为了向后兼容而维护的。
如果希望配置依赖于消息中的信息,可以使用elasticsearch_dynamic。这是 Elasticsearch 插件的一个实验性变化,允许以如下方式指定配置值:
@type elasticsearch_dynamic hosts ${record['host1']}:9200,${record['host2']}:9200 index_name my_index.${Time.at(time).getutc.strftime(@logstash_dateformat)} logstash_prefix ${tag_parts[3]} port ${9200+rand(4)} index_name ${tag_parts[2]}-${Time.at(time).getutc.strftime(@logstash_dateformat)}
请注意,这对每条消息都使用了 Ruby 的eval,因此存在性能和安全问题。
4.80. 占位符v0.14 占位符可以处理 tag 的${tag}, 像%Y%m%d的 strftime 格式,和自定义记录键,像record["mykey"]。
注意,自定义块键对于record_reformer和record_modifier是不同的表示法。它们使用record["some_key"]来指定占位符,但是这个特性使用${key1},${key2}符号。而 tag、time 和一些任意键必须包含在缓冲区指令的属性中。
它们的用法如下:
tag
@type elasticsearch index_name elastic.${tag} #=> 替换为每个事件的 tag。例如 elastic.test.tag @type memory #
time
@type elasticsearch index_name elastic.%Y%m%d #=> 例如 elastic.20170811 @type memory timekey 3600 #
custom key
records = {key1: "value1", key2: "value2"}
4.81. 多工作者@type elasticsearch index_name elastic.${key1}.${key2} # => 例如 elastic.value1.value2 @type memory #
自 Fluentd v0.14 以来,已经实现了多工作者特性,以提高多进程的吞吐量。该特性允许 Fluentd 进程使用一个或多个 CPU。此功能将通过以下系统配置启用:
4.82. log_es_400_reasonworkers N # where N is a natural number (N >= 1).
默认情况下,错误日志记录器不会记录来自 Elasticsearch API 的400错误的原因,除非您将log_level设置为debug。但是,这会导致大量的日志垃圾,如果您想要的只是400的错误原因,那么这是不可取的。您可以将此设置为true以捕获400的错误原因,而不需要所有其他调试日志。
默认值是false。
4.83. suppress_doc_wrap默认情况下,记录主体用'doc'包装。此行为不能处理更新脚本请求。您可以将此设置为禁止文档包装并允许不触及记录主体。
默认值是false。
4.84. ignore_exceptions一个将被忽略的异常列表 - 当异常发生时,块将被丢弃,缓冲区重试机制将不会被调用。也可以在层次结构的较高层指定类。例如
ignore_exceptions ["Elasticsearch::Transport::Transport::ServerError"]
将匹配ServerError的所有子类 - Elasticsearch::Transport::Transport::Errors::BadRequest,Elasticsearch::Transport::Transport::Errors::ServiceUnavailable等。
默认值为空列表(不忽略任何异常)。
4.85. exception_backup当忽略异常发生时,是否备份块。默认值是true。
4.86. bulk_message_request_threshold配置bulk_message请求分割阈值大小。
默认值为 -1(无限)。
如果您指定这个大小为负数,bulk_message请求拆分功能将被禁用。
4.87. enable_ilm启用索引生命周期管理(ILM)。
默认值是false。
注意:此参数要求安装 elasticsearch-xpack gem。
4.88. ilm_policy_id指定 ILM 策略 id。
默认值为logash-policy。
注意:此参数要求安装 elasticsearch-xpack gem。
4.89. ilm_policy将 ILM 策略内容指定为 Hash。
默认值为{}。
注意:此参数要求安装 elasticsearch-xpack gem。
4.90. ilm_policies哈希格式为{"ilm_policy_id1":{
默认值为{}。
注意:此参数要求安装 elasticsearch-xpack gem。
4.91. ilm_policy_overwrite指定是否覆盖 ilm 策略。
默认值为false。
注意:此参数要求安装 elasticsearch-xpack gem。
4.92. truncate_caches_interval指定截断缓存间隔。
如果设置了此参数,则会启动并执行用于清除alias_indexes和template_names缓存的计时器。
默认值为nil。
4.93. use_legacy_template是否使用遗留模板。
对于 Elasticsearch 7.8 或更高版本,如果template_file包含可组合索引模板,用户可以将此参数指定为false。
对于 Elasticsearch 7.7 或更早版本,用户应该将此参数指定为true。
可组合模板文档是 Put Index template API | Elasticsearch Reference,而遗留模板文档是 Index Templates | Elasticsearch Reference。
当使用此参数开启全新特性时,请确认使用的 Elasticsearch 集群是否支持可组合模板特性。
4.94.用户可以指定是否将chunk_id信息包含到记录中:
4.94.1. include_chunk_id@type elasticsearch # Other configurations. include_chunk_id true # chunk_id_key chunk_id # 默认值是 "chunk_id".
是否包含chunk_id为否。默认值为false。
4.94.2. chunk_id_key@type elasticsearch # Other configurations. include_chunk_id true
指定chunk_id_key,将chunk_id信息存储到记录中。默认值为chunk_id。
4.95. @log_level(可选的)@type elasticsearch # Other configurations. include_chunk_id chunk_id_key chunk_hex
@log_level 选项允许用户为每个插件设置不同的日志级别。支持的日志级别:
fatal,error,warn,info,debug,trace。
请参阅日志文章了解更多细节。
5. 其他您可以使用%{}样式的占位符来转义 URL 编码所需的字符。
有效的配置:
user %{demo+}
password %{@secret}
hosts https://%{j+hn}:%{passw@rd}@host1:443/elastic/,http://host2
无效的配置:
user demo+ password @secret6. 常用的输出 / 缓冲区参数
有关常用输出/缓冲区参数,请参阅以下文章:
- 输出插件概述
- 缓冲部分配置
参考 Elasticsearch 的故障排除部分,内容已翻译。
7.1. 无法向 Elasticsearch 发送事件失败的一个常见原因是,您试图连接到版本不兼容的 Elasticsearch 实例。
例如,td-agent目前捆绑了6.x系列的elasticsearch-ruby库。这意味着您的 Elasticsearch 服务器也需要是6.x版本。通过执行以下命令,可以检查安装在系统上的客户端库的实际版本。
# td-agent 的用户 $ /usr/sbin/td-agent-gem list elasticsearch # 独立运行 Fluentd 的用户 $ fluent-gem list elasticsearch
或者,fluent-plugin-elasticsearch v2.11.7或更高版本,用户可以使用validate_client_version选项检查版不兼容的情况:
validate_client_version true
如果你收到以下错误信息,请考虑安装兼容的 elasticsearch 客户端 gems:
Detected ES 5 but you use ES client 6.1.0. Please consider to use 5.x series ES client.
有关版本兼容性问题的详细信息,请阅读官方手册。
7.2. 无法看到详细的失败日志失败的一个常见原因是,您试图连接一个ssl协议版本不兼容的 Elasticsearch 实例。
例如,由于历史原因,out_elasticsearch将ssl_version设置为TLSv1。现代 Elasticsearch 生态系统要求使用TLS v1.2或更高版本进行通信。但是,在本例中,out_elasticsearch默认隐藏了传输器部件故障日志。如果您想获取传输器日志,请考虑设置以下配置:
with_transporter_log true @log_level debug
然后,Fluentd 日志中显示如下日志:
2018-10-24 10:00:00 +0900 [error]: #0 [Faraday::ConnectionFailed] SSL_connect returned=1 errno=0 state=SSLv2/v3 read server hello A: unknown protocol (OpenSSL::SSL::SSLError) {:host=>"elasticsearch-host", :port=>80, :scheme=>"https", :user=>"elastic", :password=>"changeme", :protocol=>"https"}
说明使用了不合适的TLS协议版本。如果您想使用TLS v1.2,请使用ssl_version参数如下:
ssl_version TLSv1_2
或者,在v4.0.2或集成Ruby 2.5或更高Ruby版本的更高版本中,以下配置也有效:
ssl_max_version TLSv1_2 ssl_min_version TLSv1_27.3. 无法连接启用 TLS 的反向代理
失败的一个常见原因是,你试图连接到 nginx 反向代理后面的 Elasticsearch 实例,该实例使用了不兼容的ssl协议版本。
例如,由于历史原因,out_elasticsearch将ssl_version设置为TLSv1。目前,nginx 反向代理出于安全考虑使用TLS v1.2或更高版本。但是,在本例中,out_elasticsearch默认隐藏了传输器部件故障日志。
如果你使用TLS v1.2设置 nginx 反向代理:
server {
listen :9400;
server_name ;
ssl on;
ssl_certificate /etc/ssl/certs/server-bundle.pem;
ssl_certificate_key /etc/ssl/private/server-key.pem;
ssl_client_certificate /etc/ssl/certs/ca.pem;
ssl_verify_client on;
ssl_verify_depth 2;
# Reference : https://cipherli.st/
ssl_protocols TLSv1.2;
ssl_prefer_server_ciphers on;
ssl_ciphers "EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH";
ssl_ecdh_curve secp384r1; # Requires nginx >= 1.1.0
ssl_session_cache shared:SSL:10m;
ssl_session_tickets off; # Requires nginx >= 1.5.9
ssl_stapling on; # Requires nginx >= 1.3.7
ssl_stapling_verify on; # Requires nginx => 1.3.7
resolver 127.0.0.1 valid=300s;
resolver_timeout 5s;
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload";
add_header X-frame-Options DENY;
add_header X-Content-Type-Options nosniff;
client_max_body_size 64M;
keepalive_timeout 5;
location / {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_pass http://localhost:9200;
}
}
然后,nginx 反向代理从TLSv1.2开始。
Fluentd 突然死亡,日志如下:
Oct 31 9:44:45fluentd[6442]: log writing failed. execution expired Oct 31 9:44:45 fluentd[6442]: /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/ssl_socket.rb:10:in `initialize': stack level too deep (SystemStackError) Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/connection.rb:429:in `new' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/connection.rb:429:in `socket' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/connection.rb:111:in `request_call' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/middlewares/mock.rb:48:in `request_call' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/middlewares/instrumentor.rb:26:in `request_call' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/middlewares/base.rb:16:in `request_call' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/middlewares/base.rb:16:in `request_call' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/excon-0.62.0/lib/excon/middlewares/base.rb:16:in `request_call' Oct 31 9:44:45 fluentd[6442]: ... 9266 levels... Oct 31 9:44:45 fluentd[6442]: from /opt/td-agent/embedded/lib/ruby/site_ruby/2.4.0/rubygems/core_ext/kernel_require.rb:55:in `require' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.5/bin/fluentd:8:in ` ' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/bin/fluentd:22:in `load' Oct 31 9:44:45 fluentd[6442]: from /opt/fluentd/embedded/bin/fluentd:22:in ` ' Oct 31 9:44:45 systemd[1]: fluentd.service: Control process exited, code=exited status=1
如果您想获取传输器日志,请考虑设置以下配置:
with_transporter_log true @log_level debug
然后,Fluentd 日志中显示如下日志:
2018-10-31 10:00:57 +0900 [warn]: #7 [Faraday::ConnectionFailed] Attempt 2 connecting to {:host=>"", :port=>9400, :scheme=>"https", :protocol=>"https"}
2018-10-31 10:00:57 +0900 [error]: #7 [Faraday::ConnectionFailed] Connection reset by peer - SSL_connect (Errno::ECONNRESET) {:host=>"", :port=>9400, :scheme=>"https", :protocol=>"https"}
以上日志说明,fluent-plugin-elasticsearch与 nginx 反向代理使用不兼容的SSL/TLS版本是导致此问题的根本原因。
如果您想使用TLS v1.2,请使用ssl_version参数如下:
ssl_version TLSv1_2
或者,在v4.0.2或集成Ruby 2.5或更高Ruby版本的更高版本中,以下配置也有效:
ssl_max_version TLSv1_2 ssl_min_version TLSv1_27.4. 被拒绝的日志将永远重新提交,为什么?
有时用户会这样编写 Fluentd 配置:
@type elasticsearch host localhost port 9200 type_name fluentd logstash_format true time_key @timestamp include_timestamp true reconnect_on_error true reload_on_failure true reload_connections false request_timeout 120s
上面的配置没有使用@label特性,而是使用glob(**)模式。这通常是有问题的配置。
在错误场景中,错误事件将使用@ERRORlabel 和fluent.*tag。black hole glob模式会将一个有问题的事件重新提交到推送 Elasticsearch 管道中。
这种情况导致大量的日志被拒绝:
2018-11-13 11:16:27 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="app.fluentcat" time=2018-11-13 11:16:17.492985640 +0000 record={"message"=>"xFFxAD"}
2018-11-13 11:16:38 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="fluent.warn" time=2018-11-13 11:16:27.978851140 +0000 record={"error"=>"#", "location"=>nil, "tag"=>"app.fluentcat", "time"=>2018-11-13 11:16:17.492985640 +0000, "record"=>{"message"=>"xFFxAD"}, "message"=>"dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="app.fluentcat" time=2018-11-13 11:16:17.492985640 +0000 record={"message"=>"\xFF\xAD"}"}
然后,用户应该使用更具体的 tag 路由或使用@label。下面几节将展示两个示例,说明如何解决被拒绝日志的泛滥问题。一种是使用具体的 tag 路由,另一种是使用 label 路由。
使用具体的 tag 路由
下面的配置使用了具体的 tag 路由:
@type elasticsearch host localhost port 9200 type_name fluentd logstash_format true time_key @timestamp include_timestamp true reconnect_on_error true reload_on_failure true reload_connections false request_timeout 120s
使用 label 功能
下面的配置使用了 label:
7.5. 建议安装 typhoeus gem,为什么?@type forward @label @ES
fluent-plugin-elasticsearch默认不依赖于typhoeus gem。如果您想使用typhoeus backend,您必须自己安装typhoeus gem。
如果你使用简单的 Fluentd,你可以通过以下方式安装它:
gem install typhoeus
但是,你使用 td-agent 而不是简单 Fluentd,你必须使用td-agent-gem:
td-agent-gem install typhoeus
更多细节,请参考官方插件管理文档。
7.6. 停止在 k8s 上发送事件,为什么?fluent-plugin-elasticsearch在10000个请求后重新加载连接(与事件计数不是对应的,因为 ES 插件使用批量 API)。
这个源自elasticsearch-ruby gem的功能是默认启用的。有时候,这种重载功能会影响用户使用 ES 插件发送事件。
在 k8s 平台上,用户有时需要指定以下设置:
reload_connections false reconnect_on_error true reload_on_failure true
如果使用fluentd-kubernetes-daemonset,可以使用环境变量指定它们:
- FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS设为false。
- FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR设为false。
- FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE设为false。
这个问题在第525号被报道过。
7.7. 随机发生 400 - Rejected by Elasticsearch,为什么?在 Elasticsearch 中安装的索引模板有时会产生400 - Rejected by Elasticsearch错误。例如,kubernetes 审计日志的结构如下:
"responseObject":{
"kind":"SubjectAccessReview",
"apiVersion":"authorization.k8s.io/v1beta1",
"metadata":{
"creationTimestamp":null
},
"spec":{
"nonResourceAttributes":{
"path":"/",
"verb":"get"
},
"user":"system:anonymous",
"group":[
"system:unauthenticated"
]
},
"status":{
"allowed":true,
"reason":"RBAC: allowed by ClusterRoleBinding "cluster-system-anonymous" of ClusterRole "cluster-admin" to User "system:anonymous""
}
},
最后一个元素status有时会变成"status":"Success"。此元素类型错误导致状态 400 错误。有一些解决方案来解决这个问题:
解决方案 1
对于导致元素类型出现故障的键。使用动态映射的模板如下:
{
"template": "YOURINDEXNAME-*",
"mappings": {
"fluentd": {
"dynamic_templates": [
{
"default_no_index": {
"path_match": "^.*$",
"path_unmatch": "^(@timestamp|auditID|level|stage|requestURI|sourceIPs|metadata|objectRef|user|verb)(\..+)?$",
"match_pattern": "regex",
"mapping": {
"index": false,
"enabled": false
}
}
}
]
}
}
}
注意YOURINDEXNAME应该用你正在使用的索引前缀替换。
解决方案 2
对于不稳定的responseObject和requestObject键存在的情况。
@id kube_api_audit_normalize @type record_transformer auto_typecast false enable_ruby true host "#{ENV['K8S_NODE_NAME']}" responseObject ${record["responseObject"].nil? ? "none": record["responseObject"].to_json} requestObject ${record["requestObject"].nil? ? "none": record["requestObject"].to_json} origin kubernetes-api-audit
需要使用record_transformer和其他类似的插件来规范responseObject和requestObject键。
7.8. Fluentd 似乎挂起了,不能连接 Elasticsearch,为什么?在#configure阶段,ES 插件等待直到和 ES 实例通信成功才会启动 Fluentd。因为 Fluentd 要求在#configure阶段必须正确设置配置。
在#configure阶段之后,如果它运行得非常快,并且在一些频繁使用的情况下大量发送事件。在这个场景中,在#configure阶段之前,我们需要正确地设置配置。我们提供的默认参数过于保守,无法应用于高级用户。要删除过于保守的设置,可以使用以下配置:
7.9. 启用索引生命周期管理@type elasticsearch # 一些高级用户知道他们使用的ES版本。 # 我们可以禁用启动ES版本检查。 verify_es_version_at_startup false # 如果你知道你使用的ES主版本是7,你可以在这里设置为7。 default_elasticsearch_version 7 # 如果使用的ES集群非常稳定,可以减少重试操作次数。(最低量是1) max_retry_get_es_version 1 # 如果使用的ES集群非常稳定,可以减少重试操作次数。(最低量是1) max_retry_putting_template 1 # ... 和一些ES插件配置
索引生命周期管理(ILM)是基于模板的索引管理特性。ILM的主要特征参数有:
- index_name(当logstash_format为false时)
- logstash_prefix(当logstash_format为true时)
- enable_ilm
- ilm_policy_id
- ilm_policy
- 高级用法参数
- application_name
- index_separator
它们并非都是强制性参数,但实际上它们是用于ILM特性的。
ILM目标索引别名使用index_name或从logstash_prefix计算的索引创建。
在 Elasticsearch 插件v4.0.0中,ILM目标索引将从index_name(普通模式)或logstash_prefix(与logstash_formatas true一起使用)计算。
注意:在 Elasticsearch 插件v4.1.0之前,当ILM启用时使用deflector_alias参数是允许和处理的,但是,在4.1.1或更高版本中,当ILM启用时它不能使用。
而且,ILM特性的用户应该为ILM支持的索引指定他们的 Elasticsearch 模板。因为ILM设置被注入到他们的 Elasticsearch 模板中。
application_name和index_separator也会影响索引名称别名。但是这个参数是为高级用途准备的。它通常应该与默认值一起使用,默认值为default。
然后,ILM参数在别名索引中使用,如:
简单的index_name情况:
-000001
logstash_format为true的情况:
-000001
ILM 设置示例
index_name fluentd-${tag}
application_name ${tag}
index_date_pattern "now/d"
enable_ilm true
# 策略配置
ilm_policy_id fluentd-policy
# ilm_policy {} # 使用默认策略
template_name your-fluentd-template
template_file /path/to/fluentd-template.json
# customize_template {"<>": "fluentd"}
注意:这个插件只创建启用了滚动的索引,这些索引是指向它们和索引模板的别名,如果启用了,则创建一个ILM策略。
每天创建 ILM 索引
如果你想每天创建新的索引,你应该使用logstash_format类型配置:
logstash_prefix fluentd
application_name default
index_date_pattern "now/d"
enable_ilm true
# 策略配置
ilm_policy_id fluentd-policy
# ilm_policy {} # 使用默认策略
template_name your-fluentd-template
template_file /path/to/fluentd-template.json
请注意,如果您每天创建一组新的索引,那么 elasticsearch ILM 策略系统将分别对待每一天,并始终为每一天维护一个单独的活动写索引。
如果您有基于 max_age 的滚动,它将继续滚动之前日期的索引,即使没有新的文档被索引。如果您想在一段时间后删除索引,那么 ILM 策略将永远不会删除当前的写索引,而不管它的年代如何,因此您将需要一个单独的系统,比如管理员,来实际删除旧索引。
由于这个原因,如果您使用 ILM 将日期放入索引名称中,则应该只根据文档的大小或数量进行滚动,并且可能需要使用管理器来实际删除旧索引。
固定 ILM 索引
此外,用户还可以使用固定的 ILM 索引配置。如果index_date_pattern设置为""(空字符串),Elasticsearch 插件将不会在 ILM 索引中附加日期模式:
index_name fluentd
application_name default
index_date_pattern ""
enable_ilm true
# Policy configurations
ilm_policy_id fluentd-policy
# ilm_policy {} # Use default policy
template_name your-fluentd-template
template_file /path/to/fluentd-template.json
配置动态索引或模板
有些用户想为动态索引 / 模板设置 ILM。Elasticsearch 模板中的index_patterns和template.settings.index.lifecycle.name将被 Elasticsearch 插件覆盖:
{
"index_patterns": ["mock"],
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "mock",
"rollover_alias": "mock"
},
"number_of_shards": "<>",
"number_of_replicas": "<>"
}
}
}
}
模板将被处理:
@type http port 5004 bind 0.0.0.0 body_size_limit 32m keepalive_timeout 10s @type json @type elasticsearch @id out_es_etl_webserver @log_level info include_tag_key true host $HOST port $PORT path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}" request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '30s'}" scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}" ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}" ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1'}" reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}" reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}" reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}" log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}" logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'etl-webserver'}" logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'false'}" index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'etl-webserver'}" type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}" time_key "#{ENV['FLUENT_ELASTICSEARCH_TIME_KEY'] || '@timestamp'}" include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'true'}" # ILM Settings - WITH ROLLOVER support # https://github.com/uken/fluent-plugin-elasticsearch#enable-index-lifecycle-management application_name "etl-webserver" index_date_pattern "" # 策略配置 enable_ilm true ilm_policy_id etl-webserver ilm_policy_overwrite true ilm_policy {"policy": {"phases": {"hot": {"min_age": "0ms","actions": {"rollover": {"max_age": "5m","max_size": "3gb"},"set_priority": {"priority": 100}}},"delete": {"min_age": "30d","actions": {"delete": {"delete_searchable_snapshot": true}}}}}} use_legacy_template false template_name etl-webserver template_file /configs/index-template.json template_overwrite true customize_template {"< >": "3","< >": "0"} flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}" flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}" chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '8MB'}" total_limit_size "#{ENV['FLUENT_ELASTICSEARCH_TOTAL_LIMIT_SIZE'] || '450MB'}" queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}" retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '60s'}" retry_forever false
欲了解更多详情,请参阅讨论:https://github.com/uken/fluent-plugin-elasticsearch/issues/867。
7.10. 如何指定索引编解码器Elasticsearch 可以处理存储数据的压缩方法,如 LZ4 和 best_compression。fluent-plugin-elasticsearch 不提供指定压缩方法的 API。
用户可以通过模板指定存储数据的压缩方式,创建压缩compression.json如下:
{
"order": 100,
"index_patterns": [
"YOUR-INDEX-PATTERN"
],
"settings": {
"index": {
"codec": "best_compression"
}
}
}
然后,在你的配置中指定上面的模板:
template_name best_compression_tmpl template_file compression.json
Elasticsearch 将使用best_compression存储数据:
% curl -XGET 'http://localhost:9200/logstash-2019.12.06/_settings?pretty'
{
"logstash-2019.12.06" : {
"settings" : {
"index" : {
"codec" : "best_compression",
"number_of_shards" : "1",
"provided_name" : "logstash-2019.12.06",
"creation_date" : "1575622843800",
"number_of_replicas" : "1",
"uuid" : "THE_AWESOMEUUID",
"version" : {
"created" : "7040100"
}
}
}
}
}
7.11. 当 connect_write timeout reached 时,无法将日志推入Elasticsearch,为什么?
看起来是 Elasticsearch 集群已经耗尽了。通常,Fluentd 会打印如下日志:
2019-12-29 00:23:33 +0000 [warn]: buffer flush took longer time than slow_flush_log_threshold: elapsed_time=27.283766102716327 slow_flush_log_threshold=15.0 plugin_id="object:aaaffaaaaaff" 2019-12-29 00:23:33 +0000 [warn]: buffer flush took longer time than slow_flush_log_threshold: elapsed_time=26.161768959928304 slow_flush_log_threshold=15.0 plugin_id="object:aaaffaaaaaff" 2019-12-29 00:23:33 +0000 [warn]: buffer flush took longer time than slow_flush_log_threshold: elapsed_time=28.713624476008117 slow_flush_log_threshold=15.0 plugin_id="object:aaaffaaaaaff" 2019-12-29 01:39:18 +0000 [warn]: Could not push logs to Elasticsearch, resetting connection and trying again. connect_write timeout reached 2019-12-29 01:39:18 +0000 [warn]: Could not push logs to Elasticsearch, resetting connection and trying again. connect_write timeout reached
此警告通常是由于 Elasticsearch 集群资源不足而导致的。
当 CPU 占用率急剧上升,Elasticsearch 集群正在消耗 CPU 资源时,可能是由于 CPU 资源不足导致。检查 Elasticsearch 集群的健康状态和资源使用情况。



