个性化阅读
专注于IT技术分析

ELK到AWS:更少麻烦地管理日志

本文概述

Elasticsearch是功能强大的软件解决方案, 旨在快速搜索大量数据中的信息。与Logstash和Kibana结合使用, 这形成了非正式的” ELK栈”, 通常用于收集, 临时存储, 分析和可视化日志数据。通常需要其他一些软件, 例如Filebeat将日志从服务器发送到Logstash, Elastalert根据存储在Elasticsearch中的数据的分析结果生成警报。

ELK栈功能强大, 但是…

我使用ELK来管理日志的经验非常复杂。一方面, 它非常强大, 功能范围也非常令人印象深刻。另一方面, 设置起来很棘手, 维护起来很麻烦。

事实是, Elasticsearch总的来说非常好, 可以在多种情况下使用。它甚至可以用作搜索引擎!由于它不是专门用于管理日志数据的, 因此需要更多的配置工作来针对管理此类数据的特定需求自定义其行为。

设置ELK集群非常棘手, 需要我使用许多参数才能最终启动并运行它。然后是配置它的工作。就我而言, 我需要配置五种不同的软件(Filebeat, Logstash, Elasticsearch, Kibana和Elastalert)。这可能是一件非常繁琐的工作, 因为我必须通读文档并调试链中与下一个无关的一个元素。即使在最终启动并运行群集之后, 你仍然需要对其执行常规维护操作:修补, 升级OS软件包, 检查CPU, RAM和磁盘使用情况, 根据需要进行细微调整等。

Logstash更新后, 我的整个ELK栈停止工作。经过仔细检查, 结果发现, 由于某种原因, ELK开发人员决定更改其配置文件中的关键字并将其复数。那是最后一根稻草, 因此决定寻找更好的解决方案(至少是针对我的特殊需求的更好解决方案)。

我想存储由Apache以及各种PHP和节点应用程序生成的日志, 并对其进行解析以找到指示该软件中的错误的模式。我发现的解决方案如下:

  • 在目标上安装CloudWatch Agent。
  • 配置CloudWatch代理以将日志发送到CloudWatch日志。
  • 触发Lambda函数的调用以处理日志。
  • 如果找到了模式, 则Lambda函数会将消息发布到Slack通道。
  • 在可能的情况下, 将过滤器应用于CloudWatch日志组, 以避免为每个日志调用Lambda函数(这可能会迅速增加成本)。

而且, 从高层次来看, 就是这样!一种100%无服务器解决方案, 无需维护即可正常运行, 并且无需任何额外努力即可很好地扩展。这种无服务器解决方案在服务器群集上的优势很多:

  • 本质上, 你将在群集服务器上定期执行的所有例行维护操作现在由云提供商负责。任何基础服务器都将在你不知情的情况下为你进行修补, 升级和维护。
  • 你无需再监视集群, 而将所有扩展问题委托给云提供商。的确, 如上所述的无服务器设置将自动扩展, 而你无需执行任何操作!
  • 上述解决方案需要较少的配置, 并且云提供商不太可能将重大更改引入配置格式。
  • 最后, 编写一些CloudFormation模板以将所有内容都作为基础结构代码编写起来非常容易。要设置整个ELK集群, 需要执行大量操作。

配置松弛警报

因此, 现在让我们进入细节!让我们探讨一下这种设置的CloudFormation模板的外观, 以及用于提醒工程师的Slack webhooks。我们需要先配置所有Slack设置, 所以让我们深入研究它。

AWSTemplateFormatVersion: 2010-09-09

Description: Setup log processing

Parameters:
  SlackWebhookHost:
  	Type: String
  	Description: Host name for Slack web hooks
  	Default: hooks.slack.com

  SlackWebhookPath:
  	Type: String
  	Description: Path part of the Slack webhook URL
  	Default: /services/YOUR/SLACK/WEBHOOK

你将需要为此设置Slack工作空间, 请查看此WebHooks for Slack指南以获取其他信息。

一旦创建了Slack应用程序并配置了传入的挂钩, 挂钩URL将成为CloudFormation栈的参数。

Resources:
  ApacheAccessLogGroup:
  	Type: AWS::Logs::LogGroup
  	Properties:
  	RetentionInDays: 100  # Or whatever is good for you

  ApacheErrorLogGroup:
  	Type: AWS::Logs::LogGroup
  	Properties:
  	RetentionInDays: 100  # Or whatever is good for you

在这里, 我们创建了两个日志组:一个用于Apache访问日志, 另一个用于Apache错误日志。

我没有为日志数据配置任何生命周期机制, 因为它不在本文讨论范围之内。实际上, 你可能希望缩短保留期, 并设计S3生命周期策略, 以便在一定时间后将其移至Glacier。

Lambda函数来处理访问日志

现在, 让我们实现Lambda函数, 该函数将处理Apache访问日志。

BasicLambdaExecutionRole:
	Type: AWS::IAM::Role
	Properties:
  AssumeRolePolicyDocument:
  Version: 2012-10-17
  Statement:
  - Effect: Allow
  Principal:
  Service: lambda.amazonaws.com
  Action: sts:AssumeRole
  ManagedPolicyArns:
  - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

在这里, 我们创建了一个IAM角色, 该角色将附加到Lambda函数, 以允许他们执行职责。实际上, AWSLambdaBasicExecutionRole(尽管其名称)是由AWS提供的IAM策略。它仅允许Lambda函数创建其日志组和该组中的日志流, 然后将其自己的日志发送到CloudWatch Logs。

ProcessApacheAccessLogFunction:
	Type: AWS::Lambda::Function
	Properties:
  Handler: index.handler
  Role: !GetAtt BasicLambdaExecutionRole.Arn
  Runtime: python3.7
  Timeout: 10
  Environment:
  Variables:
  SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
  SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
  Code:
  ZipFile: |
  import base64
  import gzip
  import json
  import os
  from http.client import HTTPSConnection

  def handler(event, context):
  tmp = event['awslogs']['data']
  # `awslogs.data` is base64-encoded gzip'ed JSON
  tmp = base64.b64decode(tmp)
  tmp = gzip.decompress(tmp)
  tmp = json.loads(tmp)
  events = tmp['logEvents']
  for event in events:
  raw_log = event['message']
  log = json.loads(raw_log)
  if log['status'][0] == "5":
    # This is a 5XX status code
    print(f"Received an Apache access log with a 5XX status code: {raw_log}")
    slack_host = os.getenv('SLACK_WEBHOOK_HOST')
    slack_path = os.getenv('SLACK_WEBHOOK_PATH')
    print(f"Sending Slack post to: host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
    cnx = HTTPSConnection(slack_host, timeout=5)
    cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
    # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
    resp = cnx.getresponse()
    resp_content = resp.read()
    resp_code = resp.status
    assert resp_code == 200

因此, 在这里我们定义一个Lambda函数来处理Apache访问日志。请注意, 我没有使用Apache的默认默认日志格式。我以这种方式配置了访问日志格式(你会注意到, 它本质上会生成格式化为JSON的日志, 这使得进一步处理该行变得容易得多):

LogFormat "{\"vhost\": \"%v:%p\", \"client\": \"%a\", \"user\": \"%u\", \"timestamp\": \"%{%Y-%m-%dT%H:%M:%S}t\", \"request\": \"%r\", \"status\": \"%>s\", \"size\": \"%O\", \"referer\": \"%{Referer}i\", \"useragent\": \"%{User-Agent}i\"}" json

该Lambda函数是用Python 3编写的。它采用了CloudWatch发送的日志行并可以搜索模式。在上面的示例中, 它仅检测到HTTP请求, 该请求导致生成5XX状态代码, 并将消息发布到Slack通道。

你可以在模式检测方面做任何你想做的事情, 而且它是一种真正的编程语言(Python), 而不是Logstash或Elastalert配置文件中的正则表达式模式, 这一事实为你提供了很多实现复杂模式识别的机会。

版本控制

关于修订控制的简短说明:我发现在CloudFormation模板中内联用于小型实用Lambda函数(例如此函数)的代码是完全可以接受和方便的。当然, 对于包含许多Lambda函数和层的大型项目, 这很可能很不方便, 你将需要使用SAM。

ApacheAccessLogFunctionPermission:
	Type: AWS::Lambda::Permission
	Properties:
  FunctionName: !Ref ProcessApacheAccessLogFunction
  Action: lambda:InvokeFunction
  Principal: logs.amazonaws.com
  SourceArn: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*

上面的内容授予了CloudWatch Logs调用Lambda函数的权限。一个警告:我发现使用SourceAccount属性可能导致与SourceArn的冲突。

一般来说, 当调用Lambda函数的服务在同一AWS账户中时, 我建议不要将其包括在内。 SourceArn仍将禁止其他帐户调用Lambda函数。

ApacheAccessLogSubscriptionFilter:
	Type: AWS::Logs::SubscriptionFilter
	DependsOn: ApacheAccessLogFunctionPermission
	Properties:
  LogGroupName: !Ref ApacheAccessLogGroup
  DestinationArn: !GetAtt ProcessApacheAccessLogFunction.Arn
  FilterPattern: "{$.status = 5*}"

订阅筛选器资源是CloudWatch Logs和Lambda之间的链接。在这里, 发送到ApacheAccessLogGroup的日志将被转发到我们上面定义的Lambda函数, 但是只有通过过滤器模式的那些日志。在这里, 过滤器模式需要一些JSON作为输入(过滤器模式以” {“开头, 以”}”结尾), 并且仅在其字段状态以” 5″开头时才匹配日志条目。

这意味着仅当Apache返回的HTTP状态代码为500代码时, 我们才调用Lambda函数, 这通常意味着发生了非常糟糕的事情。这样可以确保我们不会过多调用Lambda函数, 从而避免了不必要的费用。

可以在Amazon CloudWatch文档中找到有关过滤器模式的更多信息。 CloudWatch过滤器模式非常好, 尽管显然不如Grok强大。

请注意DependsOn字段, 该字段可确保CloudWatch Logs在创建订阅之前实际可以调用Lambda函数。这只是锦上添花, 在实际情况下这很可能是不必要的, Apache可能至少在几秒钟之前不会收到请求(例如:将EC2实例与负载均衡器链接并获取负载)平衡器将EC2实例的状态视为正常)。

Lambda函数处理错误日志

现在让我们看一下将处理Apache错误日志的Lambda函数。

ProcessApacheErrorLogFunction:
	Type: AWS::Lambda::Function
	Properties:
  Handler: index.handler
  Role: !GetAtt BasicLambdaExecutionRole.Arn
  Runtime: python3.7
  Timeout: 10
  Environment:
  Variables:
  SLACK_WEBHOOK_HOST: !Ref SlackWebHookHost
  SLACK_WEBHOOK_PATH: !Ref SlackWebHookPath
  Code:
  ZipFile: |
  import base64
  import gzip
  import json
  import os
  from http.client import HTTPSConnection

  def handler(event, context):
  tmp = event['awslogs']['data']
  # `awslogs.data` is base64-encoded gzip'ed JSON
  tmp = base64.b64decode(tmp)
  tmp = gzip.decompress(tmp)
  tmp = json.loads(tmp)
  events = tmp['logEvents']
  for event in events:
  raw_log = event['message']
  log = json.loads(raw_log)
  if log['level'] in ["error", "crit", "alert", "emerg"]:
    # This is a serious error message
    msg = log['msg']
    if msg.startswith("PHP Notice") or msg.startswith("PHP Warning"):
    print(f"Ignoring PHP notices and warnings: {raw_log}")
    else:
    print(f"Received a serious Apache error log: {raw_log}")
    slack_host = os.getenv('SLACK_WEBHOOK_HOST')
    slack_path = os.getenv('SLACK_WEBHOOK_PATH')
    print(f"Sending Slack post to: host={slack_host}, path={slack_path}, url={url}, content={raw_log}")
    cnx = HTTPSConnection(slack_host, timeout=5)
    cnx.request("POST", slack_path, json.dumps({'text': raw_log}))
    # It's important to read the response; if the cnx is closed too quickly, Slack might not post the msg
    resp = cnx.getresponse()
    resp_content = resp.read()
    resp_code = resp.status
    assert resp_code == 200

第二个Lambda函数处理Apache错误日志, 并且仅在遇到严重错误时才将消息发布到Slack。在这种情况下, PHP通知和警告消息被认为不够严重, 无法触发警报。

同样, 此函数希望Apache错误日志采用JSON格式。所以这是我一直在使用的错误日志格式字符串:

ErrorLogFormat "{\"vhost\": \"%v\", \"timestamp\": \"%{cu}t\", \"module\": \"%-m\", \"level\": \"%l\", \"pid\": \"%-P\", \"tid\": \"%-T\", \"oserror\": \"%-E\", \"client\": \"%-a\", \"msg\": \"%M\"}"
ApacheErrorLogFunctionPermission:
	Type: AWS::Lambda::Permission
	Properties:
  FunctionName: !Ref ProcessApacheErrorLogFunction
  Action: lambda:InvokeFunction
  Principal: logs.amazonaws.com
  SourceArn: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*
  SourceAccount: !Ref AWS::AccountId

此资源向CloudWatch Logs授予调用Lambda函数的权限。

ApacheErrorLogSubscriptionFilter:
	Type: AWS::Logs::SubscriptionFilter
	DependsOn: ApacheErrorLogFunctionPermission
	Properties:
  LogGroupName: !Ref ApacheErrorLogGroup
  DestinationArn: !GetAtt ProcessApacheErrorLogFunction.Arn
  FilterPattern: '{$.msg != "PHP Warning*" && $.msg != "PHP Notice*"}'

最后, 我们使用Apache错误日志组的订阅过滤器将CloudWatch Logs与Lambda函数链接在一起。请注意过滤器模式, 该模式可确保以” PHP警告”或” PHP通知”开头的消息日志不会触发对Lambda函数的调用。

最后的想法, 定价和可用性

关于成本的最后一句话:此解决方案比运行ELK集群便宜得多。 CloudWatch中存储的日志的价格与S3相同, Lambda作为其免费套餐的一部分, 每月允许一百万次呼叫。如果你使用的是CloudWatch Logs过滤器, 那么这对于中等流量或繁忙的网站可能就足够了, 特别是如果你编写的代码正确并且没有太多错误!

另外, 请注意, Lambda函数最多支持1, 000个并发调用。在撰写本文时, 这是AWS中的硬限制, 无法更改。但是, 你可以期望上述函数的调用持续约30-40ms。这应该足够快以应付大量流量。如果你的工作量如此之大以至于你无法达到此极限, 那么你可能需要基于Kinesis的更复杂的解决方案, 我可能会在以后的文章中介绍。

赞(0)
未经允许不得转载:srcmini » ELK到AWS:更少麻烦地管理日志

评论 抢沙发

评论前必须登录!