请启用 Javascript 以查看内容

使用 AWS Lambda 和 SNS 监控 S3 存储桶中的文件夹

 ·  ☕ 5 分钟  ·  ✍ CNSRE · 👀... 阅读

作者:SRE运维博客
博客地址:https://www.cnsre.cn/
文章地址:https://www.cnsre.cn/posts/240920163523/
相关话题:https://www.cnsre.cn/tags/lambda/


背景与需求

在许多数据分析和处理工作流中,数据会被定期上传到 Amazon S3 存储桶中的特定目录。为了确保数据同步任务的正常运行,我们需要监控这些目录是否按预期创建了新的子文件夹。如果某个目录下未生成新的子文件夹,可能意味着数据同步任务失败或出现了异常。这时,及时获取通知可以帮助运维人员迅速采取措施,避免数据丢失或延迟。

解决方案概述

本文将介绍一个基于 AWS Lambda 的解决方案,具体步骤如下:

  1. 触发机制:定期触发 Lambda 函数(可以使用 CloudWatch Events/ EventBridge)。
  2. 检查逻辑:Lambda 函数检查指定的 S3 存储桶和前缀(文件夹)下是否存在新的子文件夹。
  3. 通知机制:如果某个前缀下未发现新的子文件夹,Lambda 函数将通过 SNS 发送通知,提醒相关人员进行检查。

详细代码解析

下面是实现上述功能的 Python 代码示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

import boto3
from datetime import datetime
from dateutil import tz

def lambda_handler(event, context):
    print('Lambda 函数已启动.')
    
    s3 = boto3.resource('s3')
    bucket_name = 'hc-data-analytics'
    local_tz = tz.gettz('Asia/Shanghai')
    now = datetime.now()
    date_prefix = now.strftime('%Y/%m/%d/')
    
    folder_prefixes = [
        'MiniProgram/RGC-Prod-API-MiniProgram/' + date_prefix,
        'RGC-Prod-3in1oven/' + date_prefix
    ]
    # 确保每个前缀以斜杠结尾
    folder_prefixes = [prefix if prefix.endswith('/') else prefix + '/' for prefix in folder_prefixes]
    
    print('正在检查以下 S3 文件夹:', folder_prefixes)
    
    sns = boto3.client('sns')
    topic_arn = 'arn:aws-cn:sns:cn-north-1:1234567890:s3-logs-monitoring'
    
    for prefix in folder_prefixes:
        resp = s3.meta.client.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
        subfolders = [p['Prefix'] for p in resp.get('CommonPrefixes', [])]
    
        if len(subfolders) > 0:
            print(f"子文件夹 '{prefix}' 存在:")
            for folder in subfolders:
                print(f"发现子文件夹: {folder}")
        else:
            message = f"S3桶'{bucket_name}'中'{prefix}'下不存在新增文件夹, 即日志同步S3桶任务失败.请检查."
            sns.publish(TopicArn=topic_arn, Message=message)
            print(f"已发送 SNS 消息: {message}")
        
    print('Lambda 函数已完成.')
    
    return {
        'statusCode': 200,
        'body': 'S3 文件夹存在性检查已完成.'
    }

代码详解

  1. 导入必要的库

    1
    2
    3
    
    import boto3
    from datetime import datetime
    from dateutil import tz
    
    • boto3:AWS SDK for Python,用于与 AWS 服务交互。
    • datetimedateutil.tz:处理日期和时区。
  2. 初始化 S3 资源和相关变量

    1
    2
    3
    4
    5
    
    s3 = boto3.resource('s3')
    bucket_name = 'hc-data-analytics'
    local_tz = tz.gettz('Asia/Shanghai')
    now = datetime.now()
    date_prefix = now.strftime('%Y/%m/%d/')
    
    • 指定要监控的 S3 存储桶名称。
    • 获取当前时间,并格式化为 YYYY/MM/DD/ 的前缀,用于定位当天的文件夹。
  3. 定义需要检查的文件夹前缀

    1
    2
    3
    4
    5
    6
    
    folder_prefixes = [
        'MiniProgram/RGC-Prod-API-MiniProgram/' + date_prefix,
        'RGC-Prod-3in1oven/' + date_prefix
    ]
    # 确保每个前缀以斜杠结尾
    folder_prefixes = [prefix if prefix.endswith('/') else prefix + '/' for prefix in folder_prefixes]
    
    • 列出需要检查的两个主要前缀,并确保每个前缀以斜杠结尾,以正确匹配 S3 的目录结构。
  4. 初始化 SNS 客户端

    1
    2
    
    sns = boto3.client('sns')
    topic_arn = 'arn:aws-cn:sns:cn-north-1:1234567890:s3-logs-monitoring'
    
    • 指定 SNS 主题的 ARN,用于发送通知。
  5. 遍历每个前缀并检查子文件夹

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    
    for prefix in folder_prefixes:
        resp = s3.meta.client.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
        subfolders = [p['Prefix'] for p in resp.get('CommonPrefixes', [])]
    
        if len(subfolders) > 0:
            print(f"子文件夹 '{prefix}' 存在:")
            for folder in subfolders:
                print(f"发现子文件夹: {folder}")
        else:
            message = f"S3桶'{bucket_name}'中'{prefix}'下不存在新增文件夹, 即日志同步S3桶任务失败.请检查."
            sns.publish(TopicArn=topic_arn, Message=message)
            print(f"已发送 SNS 消息: {message}")
    
    • 使用 list_objects_v2 方法列出指定前缀下的子文件夹。
    • 如果存在子文件夹,记录日志。
    • 如果不存在子文件夹,构造错误消息并通过 SNS 发送通知。
  6. 返回执行结果

    1
    2
    3
    4
    5
    
    
    return {
        'statusCode': 200,
        'body': 'S3 文件夹存在性检查已完成.'
    }
    
    • Lambda 函数执行完成后,返回 HTTP 200 状态码和简单的消息体。

部署与配置

1. 创建 SNS 主题

  • 登录到 AWS 管理控制台,导航到 Amazon SNS 服务。
  • 创建一个新的 SNS 主题,记下其 ARN(例如,本文代码中的 arn:aws-cn:sns:cn-north-1:1234567890:s3-logs-monitoring)。
  • 配置订阅者(如电子邮件地址),以便在收到通知时接收消息。

2. 创建 IAM 角色

Lambda 函数需要适当的权限才能访问 S3 和 SNS:

  • 创建一个 IAM 角色,附加以下策略:
    • AmazonS3ReadOnlyAccess:允许读取 S3 存储桶内容。
    • AmazonSNSFullAccess 或自定义策略,允许发布到指定的 SNS 主题。
  • 确保 Lambda 函数使用此角色运行。

3. 部署 Lambda 函数

  • 登录到 AWS 管理控制台,导航到 Lambda 服务。
  • 创建一个新的 Lambda 函数,选择 Python 3.x 运行时。
  • 将上述代码粘贴到函数代码编辑器中。
  • 配置环境变量(如有需要)。
  • 设置执行角色为前面创建的 IAM 角色。
  • 配置触发器,可以使用 Amazon EventBridge(之前的 CloudWatch Events)来定期触发 Lambda 函数,例如每天运行一次。

4. 配置 EventBridge 触发器

  • 在 Lambda 函数的“配置”选项卡中,添加一个新的触发器。
  • 选择 EventBridge(CloudWatch Events),并配置一个定时表达式(如 cron(0 0 * * ? *) 表示每天午夜触发一次)。
  • 保存配置,确保触发器已启用。

最佳实践与优化

  1. 错误处理:当前代码在异常情况下可能会中断。建议添加 try-except 块,捕获并处理可能的异常,确保 Lambda 函数的稳定性。

    1
    2
    3
    4
    5
    6
    7
    8
    
    
    try:
        # 主要逻辑
    except Exception as e:
        error_message = f"Lambda 函数执行失败: {str(e)}"
        sns.publish(TopicArn=topic_arn, Message=error_message)
        print(error_message)
        raise e
    
  2. 日志记录:利用 AWS CloudWatch Logs 记录详细的日志,便于后续的排查和监控。

  3. 参数化配置:将存储桶名称、前缀列表、SNS 主题 ARN 等参数化,通过环境变量或配置文件管理,增强灵活性。

  4. 性能优化:对于大规模存储桶,可以考虑使用分页机制(ContinuationToken)处理大量对象,避免漏检。

  5. 安全性:确保 IAM 角色只授予必要的最小权限,遵循最小权限原则,增强安全性。

总结

通过结合 AWS Lambda、S3 和 SNS,我们可以轻松实现对 S3 存储桶中特定文件夹的监控,并在出现异常时及时发送通知。这种无服务器的解决方案不仅灵活高效,而且具有高度的可扩展性,适用于各种自动化监控和运维场景。希望本文提供的代码示例和详细解析能帮助您在实际项目中快速实现类似的功能。



作者:SRE运维博客
博客地址:https://www.cnsre.cn/
文章地址:https://www.cnsre.cn/posts/240920163523/
相关话题:https://www.cnsre.cn/tags/lambda/


您的鼓励是我最大的动力
alipay QR Code
wechat QR Code

Avatar
作者
CNSRE
一位只会重启的运维


目录