使用 NLP 和模式匹配检测、评估和编辑日志中的个人身份信息 – 第 2 部分
2024年11月6日 | by mebius
作者:来自 ElasticStephen Brown
如何使用 Elasticsearch、NLP 和模式匹配检测、评估和编辑日志中的 PII。
简介:
分布式系统中高熵日志的普遍存在大大增加了 PII(Personally Identifiable Information – 个人身份信息)渗入我们日志的风险,这可能导致安全和合规性问题。这篇由两部分组成的博客深入探讨了使用 Elastic Stack 识别和管理此问题的关键任务。我们将探索使用 NLP(Natural Language Processing – 自然语言处理)和模式匹配来检测、评估并在可行的情况下从被提取到 Elasticsearch 的日志中删除 PII。
在本博客的第 1 部分中,我们介绍了以下内容:
- 回顾我们可用于管理日志中的 PII 的技术和工具
- 了解 NLP/NER 在 PII 检测中的作用
- 构建可组合的处理管道以检测和评估 PII
- 对日志进行采样并通过 NER 模型运行它们
- 评估 NER 模型的结果
在本博客的第 2 部分中,我们将介绍以下内容:
- 应用编辑正则表达式模式处理器并评估结果
- 使用 ES|QL 创建警报
- 应用字段级安全性来控制对未编辑数据的访问
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些流程
提醒我们将在 2 个博客中构建的总体流程:
本练习的所有代码均可在以下位置找到:https://github.com/bvader/elastic-pii。
第 1 部分先决条件
本博客从本博客第 1 部分结束的地方开始。你必须安装并运行第 1 部分中的 NER 模型、采集管道和仪表板。
- 加载和配置 NER 模型
- 安装了博客第 1 部分中的所有可组合采集管道
- 安装仪表板
你可以在此处访问博客 1 的完整解决方案。不要忘记加载仪表板,可在此处找到。
运用编辑(redact)处理器
接下来,我们将应用 redact 处理器。redact 处理器是一个简单的基于正则表达式的处理器,它获取正则表达式模式列表并在字段中查找它们,并在找到时将它们替换为文字。编辑处理器性能合理,可以大规模运行。最后,我们将在下面的生产扩展部分详细讨论这一点。
Elasticsearch 附带了许多有用的预定义模式(patterns),redact 处理器可以方便地引用这些模式。如果一个模式不符合你的需求,请创建一个具有自定义定义的新模式。 Redact 处理器会替换每次匹配。如果有多个匹配,它们将全部替换为模式名称。更多关于 redact 处理器的用法,请参阅文章 “Elasticsearch:Redact(编辑) processor”。
在下面的代码中,我们利用了一些预定义模式以及构建了几个自定义模式。
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}",
我们还用易于识别的模式替换了 PII,以便用于评估。
此外,值得注意的是,由于 redact 处理器是一个简单的正则表达式查找和替换,因此它可以用于许多 “秘密(secrets)” 模式,而不仅仅是 PII。有许多关于正则表达式和秘密模式的参考资料,因此你可以重复使用此功能来检测日志中的秘密。
以下两段代码可在此处找到。
redact 处理器管道代码:
# Add the PII redact processor pipeline
DELETE _ingest/pipeline/logs-pii-redact-processor
PUT _ingest/pipeline/logs-pii-redact-processor
{
"processors": [
{
"set": {
"field": "redact.proc.successful",
"value": true
}
},
{
"set": {
"field": "redact.proc.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.message == null",
"field": "redact.message",
"copy_from": "message"
}
},
{
"redact": {
"field": "redact.message",
"prefix": "",
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}",
"%{IP:IP_ADDRESS_REGEX}",
"%{CREDIT_CARD:CREDIT_CARD_REGEX}",
"%{SSN:SSN_REGEX}",
"%{PHONE:PHONE_REGEX}"
],
"pattern_definitions": {
"CREDIT_CARD": """d{4}[ -]d{4}[ -]d{4}[ -]d{4}""",
"SSN": """d{3}-d{2}-d{4}""",
"PHONE": """(+d{1,2}s?)?1?-?.?s?(?d{3})?[s.-]?d{3}[s.-]?d{4}"""
},
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_PROCESSOR_FAILED",
"override": false
}
},
{
"set": {
"field": "redact.proc.successful",
"value": false
}
}
]
}
},
{
"set": {
"if": "ctx?.redact?.metgcodessage.contains('REDACTPROC')",
"field": "redact.proc.found",
"value": true
}
},
{
"set": {
"if": "ctx?.redact?.pii?.found == null",
"field": "redact.pii.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.proc?.found == true",
"field": "redact.pii.found",
"value": true
}
}
],
"on_failure": [
{
"set": {
"field": "failure",
"value": "GENERAtgcodeL_FAILURE",
"override": false
}
}
]
}
现在,我们将 logs-pii-redact-processor 管道添加到整体 process-pii 管道中。
redact 处理器管道代码:
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
}
]
}
按照重新加载日志(在下面描述)中的说明重新加载数据。如果你第一次没有生成日志,请按照数据加载附录(在下面描述)中的说明进行操作
转到 Discover 并在 KQL 栏中输入以下内容 sample.sampled : true 和 redact.message: REDACTPROC 并将 redact.message 添加到表中,你应该会看到类似这样的内容。
如果你尚未从博客第 1 部分加载仪表板,请加载它,可以使用 Kibana -> Stack Management -> Saved Objects -> Import 在此处找到它。
它现在应该看起来像这样。请注意,仪表板的 REGEX 部分现在处于活动状态。
检查点
此时,我们具有以下功能:
- 能够对传入日志进行采样并应用此 PII 编辑
- 使用 NER/NLP 和模式匹配检测和评估 PII
- 评估 PII 检测的数量、类型和质量
如果你只是运行一次以查看其工作原理,那么这是一个很好的停止点,但我们还有一些步骤可以使其在生产系统中发挥作用。
- 清理工作和未编辑(unredacted)的数据
- 更新仪表板以使用清理后的数据
- 应用基于角色的访问控制来保护原始未编辑的数据
- 创建警报
- 生产和扩展注意事项
- 如何在传入或历史数据上运行这些流程
应用于生产系统
清理工作数据并更新仪表板
现在我们将清理代码添加到整个 process-pii 管道中。
简而言之,我们设置一个标志 redact.enable: true,指示管道将未编辑的消息字段移动到 raw.message,并将编辑的消息字段 redact.message 移动到 message 字段。我们将在下一节中 “保护” raw.message。
注意:当然,如果你想完全删除未编辑的数据,你可以更改此行为。在本练习中,我们将保留并保护它。
此外,我们设置 redact.cleanup: true 来清理 NLP 工作数据。
这些字段允许你对决定保留和分析的数据进行大量控制。
以下两段代码可在此处找到。
redact 处理器管道代码:
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline and cleans up
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
},
{
"set": {
"description": "Set to true to actually redact, false will run processors but leave original",
"field": "redact.enable",
"value": true
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "message",
"target_field": "raw.message"
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "redact.message",
"target_field": "message"
}
},
{
"set": {
"description": "Set to true to actually to clean up working data",
"field": "redact.cleanup",
"value": true
}
},
{
"remove": {
"if": "ctx?.redact?.cleanup == true",
"field": [
"ml"
],
"ignore_failure": true
}
}
]
}
按照此处重新加载数据中的说明来重新加载日志中的说明。
转到 Discover 并在 KQL 栏中输入以下内容 sample.sampled : true 和 redact.pii.found: true 并将以下字段添加到表中:
message,raw.message,redact.ner.found,redact.proc.found,redact.pii.found
你应该看到类似这样的内容:
我们拥有保护 PII 和发出警报所需的一切。
加载处理已清理数据的新仪表板
要加载仪表板,请转到 Kibana -> Stack Management -> Saved Objects,然后导入 pii-dashboard-part-2.ndjson 文件(可在此处找到)。
新仪表板应如下所示。 注意:由于我们已经清理了底层数据,因此它在幕后使用了不同的字段。
你应该看到类似这样的内容
应用基于角色的访问控制来保护原始未编辑数据
Elasticsearch 支持基于角色的访问控制,包括字段和文档级访问控制;它大大降低了保护应用程序所需的操作和维护复杂性。
我们将创建一个不允许访问 raw.message 字段的角色,然后创建一个用户并为该用户分配角色。使用该角色,用户将只能看到编辑后的消息(现在位于消息字段中),但无法访问受保护的 raw.message 字段。
注意:由于我们在本练习中仅采样了 10% 的数据,因此未采样的消息字段不会移动到 raw.message,因此它们仍然可见,但这显示了你可以在生产系统中应用的功能。
以下代码部分的代码可在此处找到。
RBAC protect-pii 角色和用户代码:
# Create role with no access to the raw.message field
GET _security/role/protect-pii
DELETE _security/role/protect-pii
PUT _security/role/protect-pii
{
"cluster": [],
"indices": [
{
"names": [
"logs-*"
],
"privileges": [
"read",
"view_index_metadata"
],
"field_security": {
"grant": [
"*"
],
"except": [
"raw.message"
]
},
"allow_restricted_indices": false
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": [
"all"
],
"resources": [
"*"
]
}
],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
# Create user stephen with protect-pii role
GET _security/user/stephen
DELETE /_security/user/stephen
POST /_security/user/stephen
{
"password" : "mypassword",
"roles" : [ "protect-pii" ],
"full_name" : "Stephen Brown"
}
# Create user stephen with protect-pii role
GET _security/user/stephen
DELETE /_security/user/stephen
POST /_security/user/stephen
{
"password" : "mypassword",
"roles" : [ "protect-pii" ],
"full_name" : "Stephen Brown"
}
现在使用具有 protect-pii 角色的新用户 stephen 登录到单独的窗口。转到 Discover 并在 KQL 栏中输入 redact.pii.found : true 并将 message 字段添加到表中。另外,请注意 raw.message 不可用。
你应该看到类似这样的内容
检测到 PII 时创建警报
现在,通过处理管道,在检测到 PII 时创建警报很容易。如有必要,请详细查看 Kibana 中的警报。
注意:如果需要获取最新数据,请重新加载日志。
首先,我们将在 Discover 中创建一个简单的 ES|QL 查询。
代码可以在这里找到。
FROM logs-pii-default
| WHERE redact.pii.found == true
| STATS pii_count = count(*)
| WHERE pii_count > 0
当你运行它时你应该会看到类似这样的内容。
现在单击 “Alerts” 菜单并选择 “Create search threshold rule”,并将创建一个警报,在发现 PII 时提醒我们。
Select a time field: @timestamp Set the time window: 5 minutes
假设你最近在运行 Test 时加载了数据,它应该执行类似以下操作:
pii_count : 343 Alerts generated query matched
当警报处于活动状态时添加操作。
For each alert: On status changes Run when: Query matched
Elasticsearch query rule {{rule.name}} is active:
- PII Found: true
- PII Count: {{#context.hits}} {{_source.pii_count}}{{/context.hits}}
- Conditions Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
添加警报恢复时的操作。
For each alert: On status changes Run when: Recovered
Elasticsearch query rule {{rule.name}} is Recovered:
- PII Found: false
- Ctgcodeonditions Not Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
当所有设置完成后它应该看起来像这样并保存
如果你有最新数据,你应该会收到如下所示的活动警报。我已将我的数据发送到 Slack(聊天软件)。
Elasticsearch query rule pii-found-esql is active:
- PII Found: true
- PII Count: 374
- Conditions Met: Query matched documents over 5m
- Timestamp: 2024-10-15T02:44:52.795Z
- Link: https://mydeployment123.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
然后,如果你等待,你将收到如下所示的恢复警报。
Elasticsearch query rule pii-found-esql is Recovered:
- PII Found: false
- Conditions Not Met: Query did NOT match documents over 5m
- Timestamp: 2024-10-15T02:49:04.815Z
- Link: https://mydeployment123.kb.us-west-1.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
生产扩展
NER 扩展
正如我们在本博客的第 1 部分中提到的,NER/NLP 模型是 CPU 密集型的,大规模运行成本高昂;因此,我们采用了一种采样技术来了解日志中的风险,而无需通过 NER 模型发送完整的日志量。
请查看博客第 1 部分中 NER 模型的设置和配置。
我们为 PII 案例选择了基本 BERT NER 模型 bert-base-NER。
为了扩展摄取,我们将专注于扩展已部署模型的分配。有关此主题的更多信息,请点击此处。分配的数量必须小于每个节点可用的分配处理器(核心,而不是 vCPU)。
以下指标与博客第 1 部分中的模型和配置相关。
- 4 个分配,允许更多并行摄取
- 每个分配 1 个线程
- 0 次缓存,因为我们预计缓存命中率较低 注意如果有许多重复的日志,缓存可以提供帮助,但如果有时间戳和其他变化,缓存将无济于事,甚至会减慢进程
- 8192 队列
GET _ml/trained_models/dslim__bert-base-ner/_stats
.....
"node": {
"0m4tq7tMRC2H5p5eeZoQig": {
.....
"attributes": {
"xpack.installed": "true",
"region": "us-west-1",
"ml.allocated_processors": "5",
上面有 3 个关键信息:
- “ml.allocated_processors”:“5” 可用的物理核心/处理器数量
- “number_of_allocations”:4 分配数量,每个物理核心最多 1 个。注意:我们可以使用 5 个分配,但我们只为本次练习分配了 4 个
- “average_inference_time_ms”:138.44285714285715 每个文档的平均推理时间。
对于每分钟推理 (Inferences per Min – IPM) 每分配(每个物理核心 1 个分配)的吞吐量,数学计算非常简单,因为推理使用单个核心和单个线程。
那么每分钟每分配的推理很简单:
IPM per allocation = 60,000 ms (in a minute) / 138ms per inference = 435
当与每分钟总推理次数对齐时:
Total IPM = 435 IPM / allocation * 4 Allocations = ~1740
假设我们想要执行 10,000 IPM,我需要多少个分配(核心)?
Allocations = 10,000 IPM / 435 IPM per allocation = 23 Allocation (cores rounded up)
或者也许日志以 5000 EPS 的速度传入并且你想要进行 1% 采样。
IPM = 5000 EPS * 60sec * 0.01 sampling = 3000 IPM sampled
然后:
Number of Allocators = 3000 IPM / 435 IPM per allocation = 7 allocations (cores rounded up)
想要更快!事实证明,有一种更轻量级的 NER 模型 distilbert-NER 模型,它速度更快,但代价是准确度稍低。
通过此模型运行日志可使推理时间快近两倍!
"average_inference_time_ms": 66.0263959390863
以下是一些简单的数学运算:
$IPM per allocation = 60,000 ms (in a minute) / 61ms per inference = 983
假设我们想要做 25,000 IPM,我需要多少个分配(核心)?
Allocations = 25,000 IPM / 983 IPM per allocation = 26 Allocation (cores rounded up)
现在,你可以应用此数学知识来确定正确的采样和 NER 缩放,以支持你的日志记录用例。
redact 处理器扩展
简而言之,只要你使用适当大小和配置的节点并具有构造良好的正则表达式模式,redact 处理器就应该可以扩展到生产负载。
评估传入日志
如果你想测试数据流中的传入日志数据。你需要做的就是更改 logs@custom 管道中的条件,以将 process-pii 应用于你想要的数据集。你可以使用任何适合你条件的条件。
注意:只需确保你已考虑了 NER 和 Redact 处理器的适当扩展,它们在上面的生产扩展中进行了描述。
{
"pipeline": {
"description" : "Call the process_pii pipeline on the correct dataset",
"if": "ctx?.data_stream?.dataset == 'pii'",
因此,例如,如果你的日志进入 logs-mycustomapp-default,你只需将条件更改为:
"if": "ctx?.data_stream?.dataset == 'mycustomapp'",
评估历史数据
如果你有历史(已摄取)数据流或索引,则可以使用 _reindex API 对它们进行评估。
注意:只需确保你已考虑了 NER 和 Redact 处理器的适当扩展,它们在生产扩展中进行了描述。
有几个额外的步骤:代码可以在这里找到。
1)首先,我们可以将参数设置为仅保留采样数据,因为没有理由复制所有未采样数据。在 process-pii 管道中,有一个设置 sample.keep_unsampled,我们可以将其设置为 false,这样就只保留采样数据:
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": false
2)其次,我们将创建一个管道,将数据重新路由到正确的数据流,以运行所有 PII 评估/检测管道。它还设置了正确的数据集和命名空间:
DELETE _ingest/pipeline/sendtopii
PUT _ingest/pipeline/sendtopii
{
"processors": [
{
"set": {
"field": "data_stream.dataset",
"value": "pii"
}
},
{
"set": {
"field": "data_stream.namespace",
"value": "default"
}
},
{
"reroute" :
{
"dataset" : "{{data_stream.dataset}}",
"namespace": "{{data_stream.namespace}}"
}
}
]
}
3)最后,我们可以运行 _reindex 来选择我们想要测试/评估的数据。建议在尝试之前查看 _reindex 文档。首先,选择要评估的源数据流,在此示例中,它是 logs-generic-default 日志数据流。注意:我还添加了范围过滤器以选择特定的时间范围。由于我们正在将数据重新路由到数据流 logs-pii-default,因此我们需要使用一些 “技巧”。为此,我们只需在 _reindex 中设置 “index”: “logs-tmp-default”,因为正确的数据流将在管道中设置。我们必须这样做,因为如果从/向同一数据流调用 reroute,则它是 noop。
POST _reindex?wait_for_completion=false
{
"source": {
"index": "logs-generic-default",
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h/h",
"lt": "now"
}
}
}
]
}
}
},
"dest": {
"op_type": "create",
"index": "logs-tmp-default",
"pipeline": "sendtopii"
}
}
总结
此时,你已拥有评估、检测、分析、警告和保护日志中 PII 所需的工具和流程。
最终状态解决方案可在此处找到:。
在本博客的第 1 部分中,我们完成了以下工作。
- 回顾了我们可用于 PII 检测和评估的技术和工具
- 回顾了 NLP / NER 在 PII 检测和评估中的作用
- 构建了必要的可组合摄取管道以对日志进行采样并通过 NER 模型运行它们
- 回顾了 NER 结果并准备转到第二篇博客
在本博客的第 2 部分中,我们介绍了以下内容:
- 使用 NER 和编辑处理器编辑 PII
- 应用字段级安全性来控制对未编辑数据的访问
- 增强仪表板和警报
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些流程
因此,开始工作并降低日志中的风险吧!
数据加载附录
代码
数据加载代码可在此处找到:https://github.com/bvader/elastic-pii
$ git clone https://github.com/bvader/elastic-pii.git
创建并加载示例数据集
$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker
运行日志生成器:
$ python generate_random_logs.py
如果不更改任何参数,这将在名为 pii.log 的文件中创建 10000 个随机日志,其中包含包含和不包含 PII 的日志。
编辑 load_logs.py 并设置以下内容
# The Elastic User
ELASTIC_USER = "elastic"
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"
# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="
然后运行以下命令:
$ python load_logs.py
重新加载日志
注意要重新加载日志,你只需重新运行上述命令即可。在此练习期间,你可以多次运行该命令,日志将被重新加载(实际上是再次加载)。新日志不会与之前的运行发生冲突,因为每次运行都会有一个唯一的 run.id,它会在加载过程结束时显示。
$ python load_logs.py