티스토리 뷰

개요

원래는 teleport를 도입하면서 접근제어와 감사로그를 함께 해결하려고 했다. 그러나 teleport를 쿠버네티스 외부 환경에서 구축하려면 인증 서버와 Bastion 서버, EC2 인스턴스 두 개가 필수적으로 필요했다. 비용이 엄청 크진 않지만, AWS 인프라 관리는 내가 하지만 비용을 다른 팀에서 부담하는 모순적인 구조라 인프라를 마구마구 늘릴수가 없었다.

 

그래서 접근제어는 SSM 기반 접근 방식을 이용하기로 했다. 조금 손이 가긴 하지만 보안적으로 가장 안전하다.

어떻게 구축했는지는 이전 글에 있다.

Private Subnet에 있는 AWS RDS에 접근하기 2. Bastion + SSM with. DBeaver

 

그렇다면 감사로그는 어떻게 찍어야할까? 다행히 PostgresSQL에서 제공하는 방식이 있었다.

https://docs.aws.amazon.com/ko_kr/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.pgaudit.html

 

파라미터 그룹에 있는 옵션인 pgAudit을 이용하는 방식인데, 이러면 모든 사용자가 데이터베이스로 요청한 쿼리를 자동 저장 할 수 있다. 여기서 끝났으면 좋았겠지만, 비용 최적화를 위해 몇 가지 단계를 거쳐야한다.

 

1. PostgreSQL RDS에서 pgAudit 옵션을 활성화한 파라미터 그룹 갱신

2. cloudwatch로 로그 전송 - 최소 저장기간 지정 

3. firehose로 데이터 추출

4. Lambda 에서 검색하기 좋은 형태로 데이터 정제

5. S3에 저장

6. Athena로 조회

 

말그대로 비용절감을 위해서 저렴한 AWS 인프라만 사용했고, 한달 예상되는 금액은 bastion을 포함해 $10가 안된다. 조회할일이 많으면 조금 문제가 있긴하지만 사실상 그럴 일이 없다.

 

전체 파이프라인을 보면 다음과 같다

 

 

이번에도 인프라는 Terraform으로 작성했다. 이번엔 사용하는 인프라가 많은 만큼 코드도 복잡하다.

Terraform 코드

1. RDS에서 파라미터 그룹 설정

# ------------------------- RDS Cluster -------------------------
resource "aws_rds_cluster" "rds_cluster" {
  cluster_identifier = "myapp-cluster"

  engine               = "aurora-postgresql"
  engine_mode          = "provisioned"
  engine_version       = "15.10"

  db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.pg_audit.name
  enabled_cloudwatch_logs_exports = ["postgresql"]

  ...
}

# ------------------------- RDS Cluster Parameter Group -------------------------
resource "aws_rds_cluster_parameter_group" "pg_audit" {
  name        = "myapp-pgaudit"
  family      = "aurora-postgresql15"
  description = "pgAudit settings for Aurora PostgreSQL"

  # pgaudit 확장 사용을 위한 설정
  parameter {
    name         = "shared_preload_libraries"
    value        = "pgaudit"
    apply_method = "pending-reboot"
  }

  # 감사 로그에 기록할 항목들
  parameter {
    name  = "pgaudit.log"
    value = "read, write, ddl, role, function"
  }

  # 기본 SQL statement 로그 비활성화
  parameter {
    name  = "log_statement"
    value = "none"
  }

  # 로그 대상 설정 (stderr → CloudWatch 로그로 연동 가능)
  parameter {
    name  = "log_destination"
    value = "stderr"
  }
}

2. RDS에서 pgaudit 로그가 나올때 받아주는 cloudwatch 로그 그룹 : 비용때문에 저장기간은 1일로 고정

# ------------------------- CloudWatch 로그 그룹 -------------------------
resource "aws_cloudwatch_log_group" "rds_postgresql" {
  name  = "/aws/rds/cluster/app-prod/postgresql"

  retention_in_days = 1

  lifecycle {
    prevent_destroy = true
    ignore_changes  = [name]
  }
}

아래와 같이 로그가 저장된다. 일단은 원하는대로 잘 나오는데....

 

 

3. firehose로 데이터 S3로 추출 

이대로 AWS Kinesis firehose로 S3로 뽑아 올 수 있다. 일단 현재 서비스의 로그 크기보단 한참 큰 10MB/day까진 프리티어다. 가격도 저렴한데, 이름도 멋지다. S3부터 추가하자.

 

# ------------------------- S3 저장 버킷 -------------------------
resource "aws_s3_bucket" "audit_logs" {

  bucket        = "audit-logs-myapp"
  force_destroy = true

  tags = {
    Name = "audit-logs"
  }
}

# ------------------------- S3 수명주기 정책 -------------------------
resource "aws_s3_bucket_lifecycle_configuration" "audit_logs_lifecycle" {
  bucket = aws_s3_bucket.audit_logs.id

  rule {
    id     = "expire-after-1y"
    status = "Enabled"

    expiration {
      days = 365 # 1년 후 자동 만료
    }
  }
}

 

4. Kinesis firehose 추가

IAM이 많이 필요하고 athena에서 데이터 조회를 위해서 prefix를 지정해줬다. error 로그를 남기기위한 디렉토리도 필요함..

# ------------------------- Kinesis Firehose -------------------------
resource "aws_kinesis_firehose_delivery_stream" "audit_firehose" {
  name        = "audit-firehose"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn           = aws_iam_role.firehose_role.arn
    bucket_arn         = aws_s3_bucket.audit_logs.arn
    buffering_interval = 60
    buffering_size     = 64
    compression_format = "GZIP"

    prefix              = "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    error_output_prefix = "errors/!{firehose:error-output-type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/"

    processing_configuration {
      enabled = true
      processors {
        type = "Lambda"
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = aws_lambda_function.audit_log_processor.arn
        }
      }
    }
  }
}

# ------------------------- CloudWatch → Firehose 연결 -------------------------
resource "aws_cloudwatch_log_subscription_filter" "audit_subscription" {
  name            = "audit-to-firehose"
  log_group_name  = aws_cloudwatch_log_group.rds_postgresql.name
  filter_pattern  = ""
  destination_arn = aws_kinesis_firehose_delivery_stream.audit_firehose.arn
  role_arn        = aws_iam_role.firehose_role.arn

  depends_on = [aws_kinesis_firehose_delivery_stream.audit_firehose]
}

# ------------------------- IAM Role for Firehose -------------------------
resource "aws_iam_role" "firehose_role" {
  name = "firehose_to_s3_audit"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect = "Allow",
      Principal = {
        Service = ["firehose.amazonaws.com", "logs.amazonaws.com"]
      },
      Action = "sts:AssumeRole"
    }]
  })
}

# ------------------------- IAM Policy for Firehose Role -------------------------
resource "aws_iam_role_policy" "firehose_policy" {
  role = aws_iam_role.firehose_role.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "s3:PutObject",
          "s3:PutObjectAcl",
          "s3:PutObjectTagging",
          "s3:GetBucketLocation",
          "s3:ListBucket"
        ],
        Resource = [
          "${aws_s3_bucket.audit_logs.arn}",
          "${aws_s3_bucket.audit_logs.arn}/*"
        ]
      },
      {
        Effect = "Allow",
        Action = [
          "firehose:DescribeDeliveryStream",
          "firehose:PutRecord",
          "firehose:PutRecordBatch"
        ],
        Resource = "*"
      },
      {
        Effect = "Allow",
        Action = [
          "logs:PutSubscriptionFilter",
          "logs:DescribeLogGroups",
          "logs:GetLogEvents"
        ],
        Resource = "*"
      },
      {
        Effect = "Allow",
        Action = [
          "lambda:InvokeFunction",
          "lambda:GetFunctionConfiguration"
        ],
        Resource = aws_lambda_function.audit_log_processor.arn
      }
    ]
  })
}

이대로 추출하면 데이터 형태가 아래와 같이 나온다.

{
  "messageType": "DATA_MESSAGE",
  "owner": "************",
  "logGroup": "/aws/rds/cluster/myapp-prod/postgresql",
  "logStream": "myapp-prod-writer",
  "subscriptionFilters": [
    "audit-to-firehose"
  ],
  "logEvents": [
    {
      "id": "*********************",
      "timestamp": 1745305163000,
      "message": "2025-04-22 06:59:23 UTC:10.1.16.186(58992):user@myapp:[14853]:LOG:  AUDIT: SESSION,115,1,READ,SELECT,,,select count(*) from example_table where id=$1,<not logged>"
    },
    {
      "id": "*********************",
      "timestamp": 1745305163000,
      "message": "2025-04-22 06:59:23 UTC:10.1.16.186(58992):user@myapp:[14853]:LOG:  AUDIT: SESSION,116,1,READ,SELECT,,,select some_column from another_table where key=$1 fetch first $2 rows only,<not logged>"
    }
  ]
}

문제는 이 JSON 형태는 Athena에서 제대로 조회하기 어려운 형태의 데이터다 그래서 lambda를 붙여줬다.

 

5. Lambda 추가

# ------------------------- Lambda 함수 정의 -------------------------
resource "aws_lambda_function" "audit_log_processor" {
  filename         = "index.zip"
  function_name    = "audit-log-parser"
  handler          = "index.handler"
  runtime          = "nodejs18.x"
  role             = aws_iam_role.firehose_lambda_exec.arn
  source_code_hash = filebase64sha256("index.zip")
  timeout          = 60
}

# ------------------------- Lambda 권한 부여 (Firehose → Lambda) -------------------------
resource "aws_lambda_permission" "allow_firehose_invoke" {
  statement_id  = "AllowExecutionFromFirehose"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.audit_log_processor.function_name
  principal     = "firehose.amazonaws.com"
  source_arn    = aws_kinesis_firehose_delivery_stream.audit_firehose.arn
}
# ------------------------- index.zip안 의 index.js 코드 -------------------------
const zlib = require("zlib");

exports.handler = async (event) => {
  const output = event.records.map((record) => {
    try {
      // GZIP 압축 해제
      const compressedPayload = Buffer.from(record.data, "base64");
      const uncompressedPayload = zlib.gunzipSync(compressedPayload).toString("utf8");

      const parsed = JSON.parse(uncompressedPayload);
      const messages = parsed.logEvents.map((e) => JSON.stringify({ message: e.message })).join("\n");

      const encodedData = Buffer.from(messages, "utf8").toString("base64");

      return {
        recordId: record.recordId,
        result: "Ok",
        data: encodedData,
      };
    } catch (e) {
      return {
        recordId: record.recordId,
        result: "ProcessingFailed",
        data: record.data,
      };
    }
  });

  return { records: output };
};

이 과정을 거치면 데이터는 아래와 같은 한줄짜리 json으로 변경된다.

{
  "message": "2025-04-22 08:23:14 UTC:10.1.42.196(55870):user@myapp:[20198]:LOG:  AUDIT: SESSION,25,1,WRITE,INSERT,,,\"insert into log_table (col1,col2,col3,...) values ($1,$2,$3,...) RETURNING *\",<not logged>"
}

 

이러면 Athena에서 조회할 수 있는 상태가 되는데 여기서부턴 GPT와 알아서 어떻게 추출할지 논의해가면서 스키마를 작성하면 된다.

 

비용

 

 

GPT가 계산해 준거긴한데, 사용한 인프라에 비해 한달에 약 $0.55 수준으로 잘 정리됐다. 문제는 데이터양이 엄청 많을 땐 더 부과될 것 같고, 조회를 서비스로직에서 처리해야한다면 athena와 연동하는 추가 파이프라인이 필요한데, 이때는 감당안되는 비용이 나올 수 있다.

 

아테나는 비싸다...

 

마치며

여기까지가 이번 프로젝트에서 진행한 접근제어와 감사로그 파이프라인 구축이었다.

 

비용절감 때문에 고민이 많았는데, AWS에서 제공하는 기능들 만으로 깔끔하면서도 저렴한 파이프라인과 보안이 보장되는 private subnet 접속이 가능해졌다. 너무 AWS 벤더에 종속되는 것 같지만 종속되서 쓰고 있는데 어떡해?

 

마지막으로 이 과정을 총정리하면 다음과 같다.

 

그냥 돈내고 쓰자. 이거 누가 관리할건데?

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함