From 12a16073895c9f3f58fcf19137b97ce0a8686f53 Mon Sep 17 00:00:00 2001 From: lyx2000 <1419360299@qq.com> Date: Tue, 9 Dec 2025 10:16:20 +0800 Subject: [PATCH 1/5] init: github event byoc demo --- .../table-topic/github-event/README.md | 0 .../event-producer/create-configmap.sh | 30 ++ .../event-producer/deployment.yaml | 82 +++++ .../event-producer/event_producer.py | 305 ++++++++++++++++++ .../event-producer/github_event.avsc | 14 + .../event-producer/requirements.txt | 5 + .../table-topic/github-event/justfile | 0 .../github-event/notebook/analysis.py | 281 ++++++++++++++++ .../github-event/notebook/create-configmap.sh | 29 ++ .../github-event/notebook/deployment.yaml | 82 +++++ .../github-event/notebook/requirements.txt | 3 + .../github-event/notebook/service.yaml | 18 ++ .../table-topic/github-event/terraform/eks.tf | 37 +++ .../github-event/terraform/main.tf | 12 + .../github-event/terraform/outputs.tf | 57 ++++ .../github-event/terraform/producer.tf | 39 +++ .../github-event/terraform/providers.tf | 62 ++++ .../github-event/terraform/s3_table.tf | 30 ++ .../github-event/terraform/variables.tf | 96 ++++++ .../table-topic/policy-eks/terraform/eks.tf | 6 + .../policy-eks/terraform/providers.tf | 25 ++ .../policy-eks/terraform/variables.tf | 76 +++++ .../setup/kubernetes/aws/terraform/main.tf | 2 +- 23 files changed, 1290 insertions(+), 1 deletion(-) create mode 100644 byoc-examples/features/table-topic/github-event/README.md create mode 100755 byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/event_producer.py create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/requirements.txt create mode 100644 byoc-examples/features/table-topic/github-event/justfile create mode 100644 byoc-examples/features/table-topic/github-event/notebook/analysis.py create mode 100755 byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh create mode 100644 byoc-examples/features/table-topic/github-event/notebook/deployment.yaml create mode 100644 byoc-examples/features/table-topic/github-event/notebook/requirements.txt create mode 100644 byoc-examples/features/table-topic/github-event/notebook/service.yaml create mode 100644 byoc-examples/features/table-topic/github-event/terraform/eks.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/main.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/outputs.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/producer.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/providers.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/s3_table.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/variables.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/eks.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/providers.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/variables.tf diff --git a/byoc-examples/features/table-topic/github-event/README.md b/byoc-examples/features/table-topic/github-event/README.md new file mode 100644 index 0000000..e69de29 diff --git a/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh b/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh new file mode 100755 index 0000000..77a7498 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Script to create ConfigMap from source files + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PRODUCER_DIR="$(cd "${SCRIPT_DIR}" && pwd)" +CONFIGMAP_NAME="github-event-producer-code" +NAMESPACE="default" + +echo "Creating ConfigMap ${CONFIGMAP_NAME} from source files..." +echo "Source directory: ${PRODUCER_DIR}" +echo "" + +# Create ConfigMap from source files +kubectl create configmap ${CONFIGMAP_NAME} \ + --from-file="${PRODUCER_DIR}/event_producer.py" \ + --from-file="${PRODUCER_DIR}/requirements.txt" \ + --from-file="${PRODUCER_DIR}/github_event.avsc" \ + --namespace=${NAMESPACE} \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "āœ… ConfigMap ${CONFIGMAP_NAME} created/updated successfully!" +echo "" +echo "To view the ConfigMap:" +echo " kubectl get configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" +echo "" +echo "To delete the ConfigMap:" +echo " kubectl delete configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" + diff --git a/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml b/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml new file mode 100644 index 0000000..395aba1 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml @@ -0,0 +1,82 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: github-event-producer + namespace: default + labels: + app: github-event-producer + component: producer +spec: + replicas: 1 + selector: + matchLabels: + app: github-event-producer + template: + metadata: + labels: + app: github-event-producer + component: producer + spec: + # Use producer node group if available + nodeSelector: + node-type: producer + containers: + - name: event-producer + image: python:3.9 + command: + - sh + - -c + - | + echo "Installing Python dependencies..." + pip install --no-cache-dir -r /app/requirements.txt + echo "Starting event producer..." + python /app/event_producer.py + env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kf-pfiq444nipaz43tk.automqlab-k3f3.automq.private:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://kf-pfiq444nipaz43tk.automqlab-k3f3.automq.private:8081" + - name: TOPIC_NAME + value: "github_events_iceberg" + - name: LOG_LEVEL + value: "INFO" # Options: DEBUG, INFO, WARNING, ERROR + - name: KAFKA_DEBUG + value: "" # Set to 'all' or 'broker,topic,msg' for debug logs + - name: PYTHONUNBUFFERED + value: "1" + resources: + requests: + memory: "512Mi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "2000m" + volumeMounts: + - name: app-code + mountPath: /app + readOnly: true + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "ps aux | grep -v grep | grep event_producer.py || exit 1" + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + exec: + command: + - /bin/sh + - -c + - "ps aux | grep -v grep | grep event_producer.py || exit 1" + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + volumes: + - name: app-code + configMap: + name: github-event-producer-code + restartPolicy: Always diff --git a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py new file mode 100644 index 0000000..c6d1009 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py @@ -0,0 +1,305 @@ +import json +import gzip +import shutil +import os +import sys +import time +import logging +import requests +import dateutil.parser +from datetime import datetime, timedelta, timezone +from confluent_kafka import Producer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroSerializer +from confluent_kafka.serialization import SerializationContext, MessageField + +# Configure logging +LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper() +# Create a stream handler with unbuffered output +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# Configure root logger +root_logger = logging.getLogger() +root_logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) +root_logger.handlers = [] # Clear existing handlers +root_logger.addHandler(handler) + +logger = logging.getLogger(__name__) + +# ================= Configuration ================= +KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'automq:9092') +SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL', 'http://schema-registry:8081') +TOPIC_NAME = os.getenv('TOPIC_NAME', 'github_events_iceberg') +KAFKA_DEBUG = os.getenv('KAFKA_DEBUG', '') # Set to 'all' or 'broker,topic,msg' for debug logs + +# Read Avro Schema +schema_path = os.path.join(os.path.dirname(__file__), 'github_event.avsc') +with open(schema_path, 'r') as f: + SCHEMA_STR = f.read() + +class GithubEventMapper: + """Maps complex GH JSON to flat Avro dictionary""" + + @staticmethod + def to_timestamp(time_str): + if not time_str: return None + dt = dateutil.parser.parse(time_str) + return int(dt.timestamp() * 1000) # Convert to milliseconds + + @staticmethod + def map_event(raw): + t = raw.get('type') + + # 1. Basic fields + record = { + "id": raw.get("id"), + "type": t, + "created_at": GithubEventMapper.to_timestamp(raw.get("created_at")), + "actor_login": raw.get("actor", {}).get("login"), + "repo_name": raw.get("repo", {}).get("name") + } + + return record + +def format_hour_key(dt): + """Format datetime to GH Archive filename format: YYYY-MM-DD-H (single digit hour)""" + return f"{dt.strftime('%Y-%m-%d')}-{dt.hour}" + + +def get_initial_start_hour(): + """Calculate initial start time: 3 days ago (UTC)""" + now = datetime.now(timezone.utc) + start = now - timedelta(days=3) + # Round down to hour + return start.replace(minute=0, second=0, microsecond=0) + + +def get_target_hour(): + """Calculate target time: 3 hours ago (UTC)""" + now = datetime.now(timezone.utc) + target = now - timedelta(hours=3) + # Round down to hour + return target.replace(minute=0, second=0, microsecond=0) + + +def process_hour(date_str, hour, producer, avro_serializer): + """Process data for a single hour""" + start_time = time.time() + delivery_success = 0 + delivery_failed = 0 + + def delivery_report(err, msg): + nonlocal delivery_success, delivery_failed + if err: + delivery_failed += 1 + logger.error(f"āŒ Delivery failed: {err}") + else: + delivery_success += 1 + # Log detailed delivery info only at debug level + if logger.level == logging.DEBUG: + logger.debug(f"āœ… Message delivered: topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}") + + # Download & Process + file_name = f"{date_str}-{hour}.json.gz" + url = f"https://data.gharchive.org/{file_name}" + logger.info(f"šŸ“„ Processing: {url}") + + try: + # Stream download and decompress + response = requests.get(url, stream=True, timeout=300) + response.raise_for_status() + + with open(file_name, 'wb') as f: + shutil.copyfileobj(response.raw, f) + + count = 0 + with gzip.open(file_name, 'rt', encoding='utf-8') as f: + for line in f: + if not line.strip(): + continue + + try: + raw_json = json.loads(line) + # Core transformation + avro_record = GithubEventMapper.map_event(raw_json) + + # Send message, wait if queue is full + while True: + try: + producer.produce( + topic=TOPIC_NAME, + key=str(avro_record['id']), + value=avro_serializer(avro_record, SerializationContext(TOPIC_NAME, MessageField.VALUE)), + on_delivery=delivery_report + ) + break # Successfully sent, exit retry loop + except BufferError: + # Queue is full, process sent messages first, then retry + producer.poll(1) + continue + + count += 1 + # Call poll every 100 messages to ensure timely delivery + if count % 100 == 0: + producer.poll(0) + # Print progress every 2000 messages + if count % 2000 == 0: + logger.info(f" ↳ Sent {count} events...") + + except Exception as e: + logger.warning(f" āš ļø Error processing line: {e}") + continue + + # Flush remaining messages + logger.debug("Flushing producer...") + producer.flush(timeout=30) + + elapsed = time.time() - start_time + rate = count / elapsed if elapsed > 0 else 0 + logger.info(f"āœ… Completed: {count} events sent from {url} in {elapsed:.2f}s ({rate:.0f} events/sec)") + logger.info(f" Delivery stats: {delivery_success} successful, {delivery_failed} failed") + return count + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + logger.warning(f"āš ļø Data not available yet: {url} (404)") + return 0 + else: + logger.error(f"āŒ HTTP error processing {url}: {e}") + raise + except Exception as e: + logger.error(f"āŒ Error processing {url}: {e}", exc_info=True) + raise + finally: + if os.path.exists(file_name): + os.remove(file_name) + + +def run_continuous_producer(): + """Main loop for continuously running producer""" + logger.info("=" * 60) + logger.info("šŸš€ Starting Continuous GH Archive Producer") + logger.info("=" * 60) + + # Setup Kafka (initialize outside loop to avoid repeated creation) + logger.info("šŸ“” Initializing Kafka producer...") + logger.info(f" Bootstrap servers: {KAFKA_BOOTSTRAP_SERVERS}") + logger.info(f" Topic: {TOPIC_NAME}") + logger.info(f" Schema Registry: {SCHEMA_REGISTRY_URL}") + + try: + schema_registry_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) + logger.info("āœ… Schema Registry client created") + avro_serializer = AvroSerializer(schema_registry_client, SCHEMA_STR) + logger.info("āœ… Avro serializer created") + except Exception as e: + logger.error(f"āŒ Failed to initialize Schema Registry: {e}", exc_info=True) + raise + + # Configure Producer: increase queue size for higher throughput + producer_config = { + 'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS, + 'queue.buffering.max.messages': 100000, # Increase queue size + 'queue.buffering.max.kbytes': 1048576, # 1GB + 'batch.num.messages': 10000, # Batch size + 'log.connection.close': True, # Log connection close events + } + + # Add debug configuration if specified + if KAFKA_DEBUG: + producer_config['debug'] = KAFKA_DEBUG + logger.info(f"šŸ” Kafka debug enabled: {KAFKA_DEBUG}") + + try: + producer = Producer(producer_config) + logger.info("āœ… Kafka producer created") + + # Test connection + logger.info("šŸ”Œ Testing Kafka connection...") + metadata = producer.list_topics(timeout=10) + logger.info(f"āœ… Connected to Kafka. Available topics: {len(metadata.topics)}") + if TOPIC_NAME in metadata.topics: + topic_metadata = metadata.topics[TOPIC_NAME] + logger.info(f"āœ… Topic '{TOPIC_NAME}' exists with {len(topic_metadata.partitions)} partitions") + else: + logger.warning(f"āš ļø Topic '{TOPIC_NAME}' not found! It may be created automatically.") + except Exception as e: + logger.error(f"āŒ Failed to connect to Kafka: {e}", exc_info=True) + raise + + # Use variable to track last processed hour (stateless operation) + last_processed_hour = None + + # Determine start time + if last_processed_hour is None: + # First run, start from 3 days ago + current_hour = get_initial_start_hour() + logger.info(f"šŸ†• Starting from {format_hour_key(current_hour)} (3 days ago)") + else: + # Resume from last processed time point + current_hour = last_processed_hour + timedelta(hours=1) + logger.info(f"šŸ”„ Resuming from {format_hour_key(current_hour)}") + + cycle_count = 0 + while True: + cycle_count += 1 + logger.info("") + logger.info("=" * 60) + logger.info(f"šŸ“Š Cycle #{cycle_count} - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}") + logger.info("=" * 60) + + target_hour = get_target_hour() + logger.info(f"šŸŽÆ Target hour: {format_hour_key(target_hour)} (3 hours ago)") + logger.info(f"šŸ“ Current hour: {format_hour_key(current_hour)}") + + # Process all hours that need processing + processed_in_cycle = 0 + while current_hour < target_hour: + date_str = current_hour.strftime("%Y-%m-%d") + hour_str = str(current_hour.hour) # Single digit hour (0-23) + + try: + count = process_hour(date_str, hour_str, producer, avro_serializer) + if count > 0: + processed_in_cycle += 1 + # After successful processing, update variable and move to next hour + last_processed_hour = current_hour + current_hour += timedelta(hours=1) + else: + # Data not available (possibly 404), wait before retrying + logger.info(f"ā³ Waiting 60 seconds before retrying...") + time.sleep(60) + # Don't update current_hour, will retry in next cycle + break + + except Exception as e: + logger.error(f"āŒ Failed to process {format_hour_key(current_hour)}: {e}", exc_info=True) + logger.info(f"ā³ Waiting 60 seconds before retrying...") + time.sleep(60) + # Don't update current_hour, will retry in next cycle + break + + if processed_in_cycle > 0: + logger.info("") + logger.info(f"āœ… Cycle completed: Processed {processed_in_cycle} hours") + else: + logger.info("") + logger.info(f"āøļø No new data to process (caught up to {format_hour_key(target_hour)})") + + # Wait before next check (target time advances with current time) + wait_seconds = 300 # 5 minutes + logger.info(f"ā³ Waiting {wait_seconds} seconds before next cycle...") + time.sleep(wait_seconds) + +if __name__ == "__main__": + # Continuous mode: start from 3 days ago, continuously pull up to 3 hours ago + try: + run_continuous_producer() + except KeyboardInterrupt: + logger.info("\n\nāš ļø Producer stopped by user") + except Exception as e: + logger.error(f"\n\nāŒ Fatal error: {e}", exc_info=True) + raise \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc b/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc new file mode 100644 index 0000000..c2aa611 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "GithubEvent", + "namespace": "com.gharchive", + "doc": "Top-level structure of Github Event", + "fields": [ + { "name": "id", "type": ["null", "string"], "default": null }, + { "name": "type", "type": ["null", "string"], "default": null }, + { "name": "actor_login", "type": ["null", "string"], "default": null }, + { "name": "repo_name", "type": ["null", "string"], "default": null }, + { "name": "org_login", "type": ["null", "string"], "default": null }, + { "name": "created_at", "type": { "type": "long", "logicalType": "timestamp-millis" } } + ] +} diff --git a/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt b/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt new file mode 100644 index 0000000..9ae67aa --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt @@ -0,0 +1,5 @@ +requests +python-dateutil +confluent-kafka[schema-registry,avro] +attrs>=21.2.0 +fastavro diff --git a/byoc-examples/features/table-topic/github-event/justfile b/byoc-examples/features/table-topic/github-event/justfile new file mode 100644 index 0000000..e69de29 diff --git a/byoc-examples/features/table-topic/github-event/notebook/analysis.py b/byoc-examples/features/table-topic/github-event/notebook/analysis.py new file mode 100644 index 0000000..c0f391d --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/analysis.py @@ -0,0 +1,281 @@ +import marimo + +__generated_with = "0.18.0" +app = marimo.App(width="medium", app_title="GitHub Real-Time Analytics") + + +@app.cell(hide_code=True) +def _(): + # Import required libraries + import marimo as mo + import os + import awswrangler as wr + print("āœ“ Using awswrangler to query Athena") + + # Configure AWS credentials and region from environment variables + try: + print("Configuring AWS credentials...") + aws_region = os.getenv('AWS_REGION', 'us-east-1') + athena_workgroup = os.getenv('ATHENA_WORKGROUP', 'automqlab-k3f3-primary') + athena_database = os.getenv('ATHENA_DATABASE', 'event') + athena_s3_output = os.getenv('ATHENA_S3_OUTPUT', '') + + print(f"āœ“ AWS Region: {aws_region}") + print(f"āœ“ Athena Workgroup: {athena_workgroup}") + print(f"āœ“ Athena Database: {athena_database}") + if athena_s3_output: + print(f"āœ“ Athena S3 Output: {athena_s3_output}") + + # Test Athena connection + print("Testing Athena connection...") + # Set default session configuration + wr.config.s3_endpoint_url = None # Use default S3 endpoint + print("āœ“ awswrangler configured successfully") + + except Exception as e: + error_msg = f"""āŒ Error configuring awswrangler: {str(e)}""" + print(error_msg) + raise RuntimeError(error_msg) from e + + return mo, wr, aws_region, athena_workgroup, athena_database, athena_s3_output + + +@app.cell(hide_code=True) +def _(mo): + import time + + # State management + get_events, set_events = mo.state(value=[]) + get_last_update, set_last_update = mo.state(value="Never") + get_star_count, set_star_count = mo.state(value=0) + get_total_events, set_total_events = mo.state(value=0) + + return (get_events, set_events, get_last_update, set_last_update, get_star_count, set_star_count, get_total_events, set_total_events, time) + + +@app.cell +def _(mo): + mo.md(r""" + # šŸš€ GitHub Events Real-Time Analytics + + This project demonstrates how to leverage **[AutoMQ Table Topic](https://github.com/AutoMQ/automq)** to transform streaming GitHub events into Apache Iceberg format for real-time analytics. + + This solution ingests **GitHub Events** from [GH Archive](https://www.gharchive.org/) into AutoMQ, where the **Table Topic** feature automatically converts the streaming data into Apache Iceberg tables. AWS Athena can then query these tables directly using awswrangler, enabling real-time analysis of open-source community activities without the traditional ETL complexity. + + **Data Source**: [GH Archive](https://www.gharchive.org/) - Public GitHub timeline events + --- + """) + return + +@app.cell +def _(mo): + # Create auto-refresh component, refresh every 60 seconds (1 minute) + dataRefresh = mo.ui.refresh(options=["60s"], default_interval="60s") + return (dataRefresh,) + + +@app.cell(hide_code=True) +def _(dataRefresh, get_events, set_events, get_last_update, set_last_update, get_star_count, set_star_count, get_total_events, set_total_events, wr, athena_workgroup, athena_database, athena_s3_output, time): + # Use mo.ui.refresh to trigger data refresh + # Key: directly use dataRefresh.value to let marimo detect changes and trigger cell re-execution + # Use in SQL query comments to ensure SQL string changes when value changes, triggering re-execution + _refresh_value = dataRefresh.value + + try: + # Configure query parameters + query_params = { + 'database': athena_database, + 'workgroup': athena_workgroup, + } + if athena_s3_output: + query_params['s3_output'] = athena_s3_output + + # 1. Get star count (WatchEvent) for last 3 days + # Note: created_at is timestamp in milliseconds, convert to date for comparison + star_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT COUNT(*) as star_count + FROM {athena_database}.github_events_iceberg + WHERE type = 'WatchEvent' + AND from_unixtime(created_at / 1000) >= current_date - interval '3' day + """ + star_df = wr.athena.read_sql_query( + sql=star_query, + **query_params + ) + star_count = int(star_df.iloc[0]['star_count']) if len(star_df) > 0 else 0 + set_star_count(star_count) + + # 2. Get total event count for last 3 days + # Note: created_at is timestamp in milliseconds, convert to date for comparison + total_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT COUNT(*) as total_count + FROM {athena_database}.github_events_iceberg + WHERE from_unixtime(created_at / 1000) >= current_date - interval '3' day + """ + total_df = wr.athena.read_sql_query( + sql=total_query, + **query_params + ) + total_count = int(total_df.iloc[0]['total_count']) if len(total_df) > 0 else 0 + set_total_events(total_count) + + # 3. Get recent event list + # Convert created_at from milliseconds to readable timestamp + recent_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT + id, + type, + actor_login, + repo_name, + from_unixtime(created_at / 1000) as created_at + FROM {athena_database}.github_events_iceberg + ORDER BY created_at DESC + LIMIT 20 + """ + _pandas_df = wr.athena.read_sql_query( + sql=recent_query, + **query_params + ) + + # Update state + set_events(_pandas_df) + + # Update last refresh time + current_time = time.strftime("%H:%M:%S") + set_last_update(current_time) + + print(f"šŸ”„ [Auto-refresh] Data updated at {current_time} - Stars: {star_count}, Total Events: {total_count}, Recent Events: {len(_pandas_df)}") + + except Exception as e: + print(f"āŒ [Auto-refresh] Error refreshing data: {e}") + import traceback + traceback.print_exc() + + # Return _refresh_value to ensure marimo detects changes + return _refresh_value + + +@app.cell(hide_code=True) +def _(wr, athena_workgroup, athena_database, athena_s3_output): + # Get top 10 repositories by star count for last 3 days (no auto-refresh) + try: + query_params = { + 'database': athena_database, + 'workgroup': athena_workgroup, + } + if athena_s3_output: + query_params['s3_output'] = athena_s3_output + + top_repos_query = f""" + SELECT + repo_name, + COUNT(*) as star_count + FROM {athena_database}.github_events_iceberg + WHERE type = 'WatchEvent' + AND from_unixtime(created_at / 1000) >= current_date - interval '3' day + GROUP BY repo_name + ORDER BY star_count DESC + LIMIT 10 + """ + top_repos_pandas = wr.athena.read_sql_query( + sql=top_repos_query, + **query_params + ) + if top_repos_pandas is not None and len(top_repos_pandas) > 0: + print(f"āœ“ Loaded top {len(top_repos_pandas)} repositories") + else: + print("āš ļø No repository data found") + except Exception as e: + print(f"āŒ Error fetching top repos: {e}") + import traceback + traceback.print_exc() + top_repos_pandas = None + + return top_repos_pandas + + +@app.cell +def _(dataRefresh, mo, get_last_update, get_star_count, get_total_events): + # Note: refresh component needs to be rendered to work, so render first then hide + # Or don't hide it, let users see the refresh status + dataRefresh.style({"display": None}) + + # First row: show star count on left, total events on right + stats_row = mo.hstack([ + mo.md(f""" + ### ⭐ Recent Stars (3 days) + **{get_star_count():,}** stars + """), + mo.md(f""" + ### šŸ“Š Total Events (3 days) + **{get_total_events():,}** events + """) + ], justify="space-between") + + mo.vstack([ + mo.md("## šŸ“Š Live GitHub Events Data"), + mo.md(f"*Last updated: {get_last_update()} • Auto-refresh every 60 seconds*"), + stats_row, + dataRefresh # Ensure refresh component is rendered (even if hidden) + ]) + + +@app.cell +def _(mo, top_repos_pandas): + # Display top 10 repositories by star count for last 3 days + if top_repos_pandas is not None and len(top_repos_pandas) > 0: + top_repos_table = mo.ui.table( + top_repos_pandas, + selection=None, + show_column_summaries=False + ) + result = mo.vstack([ + mo.md("### šŸ† Top 10 Repositories by Stars (Last 3 Days)"), + top_repos_table + ]) + else: + result = mo.vstack([ + mo.md("### šŸ† Top 10 Repositories by Stars (Last 3 Days)"), + mo.md("*No data available - Please check if there are WatchEvent records in the database*") + ]) + + result + + +@app.cell +def _(get_events, mo): + events_data = get_events() + # Select only specific columns to display (modify this list as needed) + display_columns = ['id', 'type', 'actor_login', 'repo_name', 'created_at'] + + + # Filter the DataFrame to show only selected columns + if len(events_data) > 0: + # Check which columns actually exist in the data + available_columns = [col for col in display_columns if col in events_data.columns] + filtered_data = events_data[available_columns] + + print(f"āœ“ Displaying {len(available_columns)} columns: {', '.join(available_columns)}") + else: + filtered_data = events_data + + # Create interactive table with filtered data + table = mo.ui.table( + filtered_data, + selection=None, + show_column_summaries=False + ) + + # Return the vstack as the final expression to display + mo.vstack([ + mo.md("### šŸ“‹ Recent GitHub Events"), + table + ]) + return + + +if __name__ == "__main__": + app.run() diff --git a/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh b/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh new file mode 100755 index 0000000..33b5d05 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Script to create ConfigMap from source files + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +NOTEBOOK_DIR="$(cd "${SCRIPT_DIR}" && pwd)" +CONFIGMAP_NAME="github-event-notebook-code" +NAMESPACE="default" + +echo "Creating ConfigMap ${CONFIGMAP_NAME} from source files..." +echo "Source directory: ${NOTEBOOK_DIR}" +echo "" + +# Create ConfigMap from source files +kubectl create configmap ${CONFIGMAP_NAME} \ + --from-file="${NOTEBOOK_DIR}/analysis.py" \ + --from-file="${NOTEBOOK_DIR}/requirements.txt" \ + --namespace=${NAMESPACE} \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "āœ… ConfigMap ${CONFIGMAP_NAME} created/updated successfully!" +echo "" +echo "To view the ConfigMap:" +echo " kubectl get configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" +echo "" +echo "To delete the ConfigMap:" +echo " kubectl delete configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" + diff --git a/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml b/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml new file mode 100644 index 0000000..6ca8fd4 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml @@ -0,0 +1,82 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: github-event-notebook + namespace: default + labels: + app: github-event-notebook + component: notebook +spec: + replicas: 1 + selector: + matchLabels: + app: github-event-notebook + template: + metadata: + labels: + app: github-event-notebook + component: notebook + spec: + # Use producer node group if available + nodeSelector: + node-type: producer + containers: + - name: notebook + image: python:3.9 + command: + - sh + - -c + - | + echo "Installing Python dependencies..." + pip install --no-cache-dir -r /app/requirements.txt + echo "Starting marimo notebook server..." + marimo run /app/analysis.py --host 0.0.0.0 --port 8000 + ports: + - containerPort: 8000 + name: http + protocol: TCP + env: + - name: AWS_REGION + value: "us-east-1" # Default region, can be overridden + - name: ATHENA_WORKGROUP + value: "primary" # Default workgroup, can be overridden + - name: ATHENA_DATABASE + value: "event_data" # Database name from terraform + - name: ATHENA_S3_OUTPUT + value: "" # Will be set from terraform output if needed + - name: PYTHONUNBUFFERED + value: "1" + # AWS credentials will be provided via IAM role (IRSA) or environment variables + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "2000m" + volumeMounts: + - name: app-code + mountPath: /app + readOnly: true + livenessProbe: + httpGet: + path: / + port: 8000 + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + httpGet: + path: / + port: 8000 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + volumes: + - name: app-code + configMap: + name: github-event-notebook-code + restartPolicy: Always + diff --git a/byoc-examples/features/table-topic/github-event/notebook/requirements.txt b/byoc-examples/features/table-topic/github-event/notebook/requirements.txt new file mode 100644 index 0000000..dc5a7ed --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/requirements.txt @@ -0,0 +1,3 @@ +marimo +awswrangler +pandas diff --git a/byoc-examples/features/table-topic/github-event/notebook/service.yaml b/byoc-examples/features/table-topic/github-event/notebook/service.yaml new file mode 100644 index 0000000..8d2af75 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: github-event-notebook + namespace: default + labels: + app: github-event-notebook + component: notebook +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: 8000 + protocol: TCP + name: http + selector: + app: github-event-notebook + diff --git a/byoc-examples/features/table-topic/github-event/terraform/eks.tf b/byoc-examples/features/table-topic/github-event/terraform/eks.tf new file mode 100644 index 0000000..d5dd62f --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/eks.tf @@ -0,0 +1,37 @@ +module "eks-env" { + source = "../../../setup/kubernetes/aws/terraform" + + region = var.region + resource_suffix = local.resource_suffix + + node_group = var.node_group +} + +resource "aws_vpc_security_group_ingress_rule" "automq_console_ingress_rule" { + description = "Allow inbound traffic from security group of AutoMQ Console" + from_port = 0 + to_port = 65535 + ip_protocol = "tcp" + security_group_id = module.eks-env.eks_cluster_security_group + referenced_security_group_id = module.automq-byoc.automq_byoc_security_group_id + + depends_on = [module.automq-byoc, module.eks-env] +} + +resource "aws_eks_access_entry" "cluster_admins" { + cluster_name = module.eks-env.cluster_name + principal_arn = module.automq-byoc.automq_byoc_console_role_arn + kubernetes_groups = [] + type = "STANDARD" + depends_on = [module.automq-byoc, module.eks-env] +} + +resource "aws_eks_access_policy_association" "cluster_admins" { + cluster_name = module.eks-env.cluster_name + policy_arn = "arn:aws:eks::aws:cluster-access-policy/AmazonEKSClusterAdminPolicy" + principal_arn = module.automq-byoc.automq_byoc_console_role_arn + access_scope { + type = "cluster" + } + depends_on = [module.automq-byoc, module.eks-env] +} diff --git a/byoc-examples/features/table-topic/github-event/terraform/main.tf b/byoc-examples/features/table-topic/github-event/terraform/main.tf new file mode 100644 index 0000000..ae007aa --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/main.tf @@ -0,0 +1,12 @@ +module "automq-byoc" { + source = "AutoMQ/automq-byoc-environment/aws" + version = "0.3.2" + + cloud_provider_region = var.region + automq_byoc_env_id = local.resource_suffix + + create_new_vpc = false + automq_byoc_vpc_id = module.eks-env.vpc_id + automq_byoc_env_console_public_subnet_id = module.eks-env.public_subnets[0] +} + diff --git a/byoc-examples/features/table-topic/github-event/terraform/outputs.tf b/byoc-examples/features/table-topic/github-event/terraform/outputs.tf new file mode 100644 index 0000000..cd14273 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/outputs.tf @@ -0,0 +1,57 @@ +output "console_endpoint" { + description = "Console endpoint for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_endpoint +} + +output "initial_username" { + description = "Initial username for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_initial_username +} + +output "initial_password" { + description = "Initial password for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_initial_password +} + +output "dns_zone_id" { + description = "Route53 DNS Zone ID for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_vpc_route53_zone_id +} + +output "data_bucket" { + description = "Data bucket name for the AutoMQ BYOC environment" + value = "automq-data-${module.automq-byoc.automq_byoc_env_id}" +} + +output "cluster_name" { + description = "Name of the EKS cluster" + value = module.eks-env.cluster_name +} + +output "region" { + description = "AWS region where resources are deployed" + value = var.region +} + +output "vpc_id" { + description = "VPC ID used by the EKS environment" + value = module.eks-env.vpc_id +} + +output "default_az" { + description = "Selected availability zone (first private subnet AZ)" + value = module.eks-env.azs[0] + +} + +output "automq_environment_id" { + description = "AutoMQ Environment ID used for BYOC" + value = module.automq-byoc.automq_byoc_env_id +} + +output "node_group_instance_profile_arn" { + description = "ARN of the EKS Node Group" + value = module.eks-env.node_group_instance_profile_arn +} + + diff --git a/byoc-examples/features/table-topic/github-event/terraform/producer.tf b/byoc-examples/features/table-topic/github-event/terraform/producer.tf new file mode 100644 index 0000000..d607218 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/producer.tf @@ -0,0 +1,39 @@ +# Producer Node Group +resource "aws_eks_node_group" "producer_node_group" { + count = 1 + cluster_name = module.eks-env.cluster_name + node_group_name = "producer-node-group-${local.resource_suffix}" + node_role_arn = module.eks-env.node_role_arn + + # Use the same subnet as the default node group (single AZ for cost optimization) + subnet_ids = slice(module.eks-env.private_subnets, 0, 1) + + scaling_config { + desired_size = var.producer_desired_size + max_size = var.producer_max_size + min_size = var.producer_min_size + } + + capacity_type = var.producer_capacity_type + instance_types = var.producer_instance_types + ami_type = var.producer_ami_type + disk_size = var.producer_disk_size + + labels = merge( + { + "node-type" = "producer" + "workload-type" = "producer" + } + ) + + tags = merge( + { + Name = "producer-node-group-${local.resource_suffix}" + } + ) + + # Ensure that IAM Role permissions are created before and deleted after EKS Node Group handling. + depends_on = [ + module.eks-env + ] +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/providers.tf b/byoc-examples/features/table-topic/github-event/terraform/providers.tf new file mode 100644 index 0000000..4aa29dd --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/providers.tf @@ -0,0 +1,62 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = " > 5.0.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.5.0" + } + helm = { + source = "hashicorp/helm" + version = "~> 2.0" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = "~> 2.0" + } + } +} + +# Configure AWS Provider +provider "aws" { + region = var.region +} + +# Configure Kubernetes provider to connect to the EKS cluster +provider "kubernetes" { + host = module.eks-env.eks_cluster_endpoint + cluster_ca_certificate = base64decode(module.eks-env.eks_cluster_ca_certificate) + + # Use AWS CLI to obtain EKS token dynamically + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + args = [ + "eks", + "get-token", + "--cluster-name", + module.eks-env.cluster_name, + ] + } +} + +# Configure Helm provider using the same Kubernetes connection +provider "helm" { + kubernetes { + host = module.eks-env.eks_cluster_endpoint + cluster_ca_certificate = base64decode(module.eks-env.eks_cluster_ca_certificate) + + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + args = [ + "eks", + "get-token", + "--cluster-name", + module.eks-env.cluster_name, + ] + } + } +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf new file mode 100644 index 0000000..7874366 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf @@ -0,0 +1,30 @@ +resource "aws_s3tables_table_bucket" "event_table_bucket" { + name = "${local.resource_suffix}-event-analytics-bucket" +} + +resource "aws_s3tables_namespace" "event_namespace" { + table_bucket_arn = aws_s3tables_table_bucket.event_table_bucket.arn + namespace = "event_data" +} + +output "s3_table_bucket_arn" { + description = "S3 Table Bucket ARN" + value = aws_s3tables_table_bucket.event_table_bucket.arn +} + +resource "aws_s3_bucket" "athena_results" { + bucket = "${local.resource_suffix}-athena-query-results" + force_destroy = true +} + + +resource "aws_athena_workgroup" "lab-athena-workgroup" { + name = "${local.resource_suffix}-primary" # č¦†ē›–é»˜č®¤ēš„ primary å·„ä½œē»„ + force_destroy = true + + configuration { + result_configuration { + output_location = "s3://${aws_s3_bucket.athena_results.bucket}/output/" + } + } +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/variables.tf b/byoc-examples/features/table-topic/github-event/terraform/variables.tf new file mode 100644 index 0000000..294a43e --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/variables.tf @@ -0,0 +1,96 @@ + +variable "region" { + description = "AWS region" + type = string + default = "us-east-1" +} + +variable "resource_suffix" { + description = "Suffix for resource names" + type = string + default = "automqlab" +} + +resource "random_string" "resource_suffix" { + length = 4 + upper = false + lower = true + numeric = true + special = false +} + +locals { + # Append a dash and a 4-char random tail to the configured suffix + resource_suffix = "${var.resource_suffix}-${random_string.resource_suffix.result}" +} + +variable "node_group" { + description = "Configuration for EKS node group" + type = object({ + name = string + ami_type = string + instance_type = string + desired_size = number + max_size = number + min_size = number + }) + default = { + name = "automq-node-group" + desired_size = 4 # Desired number of nodes + max_size = 10 # Maximum number of nodes + min_size = 3 # Minimum number of nodes + instance_type = "c6g.2xlarge" # Compute-optimized instance with AWS Graviton2 processor + ami_type = "AL2_ARM_64" # Amazon Linux 2 AMI type, can use AL2_ARM_64 for ARM architecture + } +} + + +# Producer Node Group Configuration +variable "enable_producer_nodes" { + description = "Whether to create producer node group" + type = bool + default = true +} + +# Producer node group scaling configuration +variable "producer_capacity_type" { + description = "Type of capacity associated with the producer EKS Node Group. Valid values: ON_DEMAND, SPOT" + type = string + default = "ON_DEMAND" +} + +variable "producer_instance_types" { + description = "List of instance types for the producer node group - configured for at least 4c8g" + type = list(string) + default = ["c5.xlarge", "c5a.xlarge", "c5n.xlarge", "m5.xlarge", "m5a.xlarge"] +} + +variable "producer_desired_size" { + description = "Desired number of producer nodes" + type = number + default = 1 +} + +variable "producer_max_size" { + description = "Maximum number of producer nodes" + type = number + default = 2 +} + +variable "producer_min_size" { + description = "Minimum number of producer nodes" + type = number + default = 1 +} + +variable "producer_ami_type" { + description = "Type of Amazon Machine Image (AMI) associated with the producer EKS Node Group" + type = string + default = "AL2023_x86_64_STANDARD" +} + +variable "producer_disk_size" { + description = "Disk size in GiB for producer worker nodes" + type = number + default = 50 +} diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf b/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf new file mode 100644 index 0000000..2e6cec9 --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf @@ -0,0 +1,6 @@ +module "eks-env" { + source = "../../../../setup/kubernetes/aws/terraform" + + region = var.region + resource_suffix = local.resource_suffix +} diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf b/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf new file mode 100644 index 0000000..b0364cf --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf @@ -0,0 +1,25 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = " > 5.0.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.5.0" + } + helm = { + source = "hashicorp/helm" + version = "~> 2.0" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = "~> 2.0" + } + } +} + +# Configure AWS Provider +provider "aws" { + region = var.region +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf b/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf new file mode 100644 index 0000000..f87a911 --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf @@ -0,0 +1,76 @@ + +variable "region" { + description = "AWS region" + type = string + default = "us-east-1" +} + +variable "resource_suffix" { + description = "Suffix for resource names" + type = string + default = "automqlab" +} + +resource "random_string" "resource_suffix" { + length = 4 + upper = false + lower = true + numeric = true + special = false +} + +locals { + # Append a dash and a 4-char random tail to the configured suffix + resource_suffix = "${var.resource_suffix}-${random_string.resource_suffix.result}" +} + + +# Producer Node Group Configuration +variable "enable_producer_nodes" { + description = "Whether to create producer node group" + type = bool + default = true +} + +# Producer node group scaling configuration +variable "producer_capacity_type" { + description = "Type of capacity associated with the producer EKS Node Group. Valid values: ON_DEMAND, SPOT" + type = string + default = "ON_DEMAND" +} + +variable "producer_instance_types" { + description = "List of instance types for the producer node group - configured for at least 4c8g" + type = list(string) + default = ["c5.xlarge", "c5a.xlarge", "c5n.xlarge", "m5.xlarge", "m5a.xlarge"] +} + +variable "producer_desired_size" { + description = "Desired number of producer nodes" + type = number + default = 1 +} + +variable "producer_max_size" { + description = "Maximum number of producer nodes" + type = number + default = 2 +} + +variable "producer_min_size" { + description = "Minimum number of producer nodes" + type = number + default = 1 +} + +variable "producer_ami_type" { + description = "Type of Amazon Machine Image (AMI) associated with the producer EKS Node Group" + type = string + default = "AL2023_x86_64_STANDARD" +} + +variable "producer_disk_size" { + description = "Disk size in GiB for producer worker nodes" + type = number + default = 50 +} diff --git a/byoc-examples/setup/kubernetes/aws/terraform/main.tf b/byoc-examples/setup/kubernetes/aws/terraform/main.tf index 60efdd4..da9789a 100644 --- a/byoc-examples/setup/kubernetes/aws/terraform/main.tf +++ b/byoc-examples/setup/kubernetes/aws/terraform/main.tf @@ -54,7 +54,7 @@ resource "aws_eks_node_group" "automq-node-groups" { subnet_ids = slice(module.network.private_subnets, 0, 1) ami_type = local.node_group.ami_type - capacity_type = "ON_DEMAND" # Use On-Demand instances, can switch to "SPOT" for cost savings + capacity_type = "SPOT" # Use On-Demand instances, can switch to "SPOT" for cost savings instance_types = [local.node_group.instance_type] # Node group auto-scaling configuration From 0deb3fe2a9ff19fdf88e45491c8e8dcbe71b0aee Mon Sep 17 00:00:00 2001 From: lyx2000 <1419360299@qq.com> Date: Wed, 17 Dec 2025 11:50:21 +0800 Subject: [PATCH 2/5] Optimize the event producer logic --- .../table-topic/github-event/event-producer/event_producer.py | 2 ++ .../features/table-topic/github-event/terraform/eks.tf | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py index c6d1009..82a3f42 100644 --- a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py +++ b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py @@ -145,6 +145,8 @@ def delivery_report(err, msg): # Call poll every 100 messages to ensure timely delivery if count % 100 == 0: producer.poll(0) + # Sleep to spread events evenly: every 100 events sleep 3.6s (100k events in 1 hour) + time.sleep(3.6) # Print progress every 2000 messages if count % 2000 == 0: logger.info(f" ↳ Sent {count} events...") diff --git a/byoc-examples/features/table-topic/github-event/terraform/eks.tf b/byoc-examples/features/table-topic/github-event/terraform/eks.tf index d5dd62f..4551e15 100644 --- a/byoc-examples/features/table-topic/github-event/terraform/eks.tf +++ b/byoc-examples/features/table-topic/github-event/terraform/eks.tf @@ -1,5 +1,5 @@ module "eks-env" { - source = "../../../setup/kubernetes/aws/terraform" + source = "../../../../setup/kubernetes/aws/terraform" region = var.region resource_suffix = local.resource_suffix From 0fc93aeb9d8ce1ece3555bdcce58f8ecbb2128c1 Mon Sep 17 00:00:00 2001 From: lyx2000 <1419360299@qq.com> Date: Tue, 9 Dec 2025 10:16:20 +0800 Subject: [PATCH 3/5] init: github event byoc demo --- .../table-topic/github-event/README.md | 0 .../event-producer/create-configmap.sh | 30 ++ .../event-producer/deployment.yaml | 82 +++++ .../event-producer/event_producer.py | 305 ++++++++++++++++++ .../event-producer/github_event.avsc | 14 + .../event-producer/requirements.txt | 5 + .../table-topic/github-event/justfile | 0 .../github-event/notebook/analysis.py | 281 ++++++++++++++++ .../github-event/notebook/create-configmap.sh | 29 ++ .../github-event/notebook/deployment.yaml | 82 +++++ .../github-event/notebook/requirements.txt | 3 + .../github-event/notebook/service.yaml | 18 ++ .../table-topic/github-event/terraform/eks.tf | 37 +++ .../github-event/terraform/main.tf | 12 + .../github-event/terraform/outputs.tf | 57 ++++ .../github-event/terraform/producer.tf | 39 +++ .../github-event/terraform/providers.tf | 62 ++++ .../github-event/terraform/s3_table.tf | 30 ++ .../github-event/terraform/variables.tf | 96 ++++++ .../table-topic/policy-eks/terraform/eks.tf | 6 + .../policy-eks/terraform/providers.tf | 25 ++ .../policy-eks/terraform/variables.tf | 76 +++++ .../setup/kubernetes/aws/terraform/main.tf | 2 +- 23 files changed, 1290 insertions(+), 1 deletion(-) create mode 100644 byoc-examples/features/table-topic/github-event/README.md create mode 100755 byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/event_producer.py create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc create mode 100644 byoc-examples/features/table-topic/github-event/event-producer/requirements.txt create mode 100644 byoc-examples/features/table-topic/github-event/justfile create mode 100644 byoc-examples/features/table-topic/github-event/notebook/analysis.py create mode 100755 byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh create mode 100644 byoc-examples/features/table-topic/github-event/notebook/deployment.yaml create mode 100644 byoc-examples/features/table-topic/github-event/notebook/requirements.txt create mode 100644 byoc-examples/features/table-topic/github-event/notebook/service.yaml create mode 100644 byoc-examples/features/table-topic/github-event/terraform/eks.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/main.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/outputs.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/producer.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/providers.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/s3_table.tf create mode 100644 byoc-examples/features/table-topic/github-event/terraform/variables.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/eks.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/providers.tf create mode 100644 byoc-examples/features/table-topic/policy-eks/terraform/variables.tf diff --git a/byoc-examples/features/table-topic/github-event/README.md b/byoc-examples/features/table-topic/github-event/README.md new file mode 100644 index 0000000..e69de29 diff --git a/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh b/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh new file mode 100755 index 0000000..77a7498 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/create-configmap.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Script to create ConfigMap from source files + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PRODUCER_DIR="$(cd "${SCRIPT_DIR}" && pwd)" +CONFIGMAP_NAME="github-event-producer-code" +NAMESPACE="default" + +echo "Creating ConfigMap ${CONFIGMAP_NAME} from source files..." +echo "Source directory: ${PRODUCER_DIR}" +echo "" + +# Create ConfigMap from source files +kubectl create configmap ${CONFIGMAP_NAME} \ + --from-file="${PRODUCER_DIR}/event_producer.py" \ + --from-file="${PRODUCER_DIR}/requirements.txt" \ + --from-file="${PRODUCER_DIR}/github_event.avsc" \ + --namespace=${NAMESPACE} \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "āœ… ConfigMap ${CONFIGMAP_NAME} created/updated successfully!" +echo "" +echo "To view the ConfigMap:" +echo " kubectl get configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" +echo "" +echo "To delete the ConfigMap:" +echo " kubectl delete configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" + diff --git a/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml b/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml new file mode 100644 index 0000000..395aba1 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/deployment.yaml @@ -0,0 +1,82 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: github-event-producer + namespace: default + labels: + app: github-event-producer + component: producer +spec: + replicas: 1 + selector: + matchLabels: + app: github-event-producer + template: + metadata: + labels: + app: github-event-producer + component: producer + spec: + # Use producer node group if available + nodeSelector: + node-type: producer + containers: + - name: event-producer + image: python:3.9 + command: + - sh + - -c + - | + echo "Installing Python dependencies..." + pip install --no-cache-dir -r /app/requirements.txt + echo "Starting event producer..." + python /app/event_producer.py + env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kf-pfiq444nipaz43tk.automqlab-k3f3.automq.private:9092" + - name: SCHEMA_REGISTRY_URL + value: "http://kf-pfiq444nipaz43tk.automqlab-k3f3.automq.private:8081" + - name: TOPIC_NAME + value: "github_events_iceberg" + - name: LOG_LEVEL + value: "INFO" # Options: DEBUG, INFO, WARNING, ERROR + - name: KAFKA_DEBUG + value: "" # Set to 'all' or 'broker,topic,msg' for debug logs + - name: PYTHONUNBUFFERED + value: "1" + resources: + requests: + memory: "512Mi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "2000m" + volumeMounts: + - name: app-code + mountPath: /app + readOnly: true + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "ps aux | grep -v grep | grep event_producer.py || exit 1" + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + exec: + command: + - /bin/sh + - -c + - "ps aux | grep -v grep | grep event_producer.py || exit 1" + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + volumes: + - name: app-code + configMap: + name: github-event-producer-code + restartPolicy: Always diff --git a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py new file mode 100644 index 0000000..c6d1009 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py @@ -0,0 +1,305 @@ +import json +import gzip +import shutil +import os +import sys +import time +import logging +import requests +import dateutil.parser +from datetime import datetime, timedelta, timezone +from confluent_kafka import Producer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroSerializer +from confluent_kafka.serialization import SerializationContext, MessageField + +# Configure logging +LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper() +# Create a stream handler with unbuffered output +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) + +# Configure root logger +root_logger = logging.getLogger() +root_logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) +root_logger.handlers = [] # Clear existing handlers +root_logger.addHandler(handler) + +logger = logging.getLogger(__name__) + +# ================= Configuration ================= +KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'automq:9092') +SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL', 'http://schema-registry:8081') +TOPIC_NAME = os.getenv('TOPIC_NAME', 'github_events_iceberg') +KAFKA_DEBUG = os.getenv('KAFKA_DEBUG', '') # Set to 'all' or 'broker,topic,msg' for debug logs + +# Read Avro Schema +schema_path = os.path.join(os.path.dirname(__file__), 'github_event.avsc') +with open(schema_path, 'r') as f: + SCHEMA_STR = f.read() + +class GithubEventMapper: + """Maps complex GH JSON to flat Avro dictionary""" + + @staticmethod + def to_timestamp(time_str): + if not time_str: return None + dt = dateutil.parser.parse(time_str) + return int(dt.timestamp() * 1000) # Convert to milliseconds + + @staticmethod + def map_event(raw): + t = raw.get('type') + + # 1. Basic fields + record = { + "id": raw.get("id"), + "type": t, + "created_at": GithubEventMapper.to_timestamp(raw.get("created_at")), + "actor_login": raw.get("actor", {}).get("login"), + "repo_name": raw.get("repo", {}).get("name") + } + + return record + +def format_hour_key(dt): + """Format datetime to GH Archive filename format: YYYY-MM-DD-H (single digit hour)""" + return f"{dt.strftime('%Y-%m-%d')}-{dt.hour}" + + +def get_initial_start_hour(): + """Calculate initial start time: 3 days ago (UTC)""" + now = datetime.now(timezone.utc) + start = now - timedelta(days=3) + # Round down to hour + return start.replace(minute=0, second=0, microsecond=0) + + +def get_target_hour(): + """Calculate target time: 3 hours ago (UTC)""" + now = datetime.now(timezone.utc) + target = now - timedelta(hours=3) + # Round down to hour + return target.replace(minute=0, second=0, microsecond=0) + + +def process_hour(date_str, hour, producer, avro_serializer): + """Process data for a single hour""" + start_time = time.time() + delivery_success = 0 + delivery_failed = 0 + + def delivery_report(err, msg): + nonlocal delivery_success, delivery_failed + if err: + delivery_failed += 1 + logger.error(f"āŒ Delivery failed: {err}") + else: + delivery_success += 1 + # Log detailed delivery info only at debug level + if logger.level == logging.DEBUG: + logger.debug(f"āœ… Message delivered: topic={msg.topic()}, partition={msg.partition()}, offset={msg.offset()}") + + # Download & Process + file_name = f"{date_str}-{hour}.json.gz" + url = f"https://data.gharchive.org/{file_name}" + logger.info(f"šŸ“„ Processing: {url}") + + try: + # Stream download and decompress + response = requests.get(url, stream=True, timeout=300) + response.raise_for_status() + + with open(file_name, 'wb') as f: + shutil.copyfileobj(response.raw, f) + + count = 0 + with gzip.open(file_name, 'rt', encoding='utf-8') as f: + for line in f: + if not line.strip(): + continue + + try: + raw_json = json.loads(line) + # Core transformation + avro_record = GithubEventMapper.map_event(raw_json) + + # Send message, wait if queue is full + while True: + try: + producer.produce( + topic=TOPIC_NAME, + key=str(avro_record['id']), + value=avro_serializer(avro_record, SerializationContext(TOPIC_NAME, MessageField.VALUE)), + on_delivery=delivery_report + ) + break # Successfully sent, exit retry loop + except BufferError: + # Queue is full, process sent messages first, then retry + producer.poll(1) + continue + + count += 1 + # Call poll every 100 messages to ensure timely delivery + if count % 100 == 0: + producer.poll(0) + # Print progress every 2000 messages + if count % 2000 == 0: + logger.info(f" ↳ Sent {count} events...") + + except Exception as e: + logger.warning(f" āš ļø Error processing line: {e}") + continue + + # Flush remaining messages + logger.debug("Flushing producer...") + producer.flush(timeout=30) + + elapsed = time.time() - start_time + rate = count / elapsed if elapsed > 0 else 0 + logger.info(f"āœ… Completed: {count} events sent from {url} in {elapsed:.2f}s ({rate:.0f} events/sec)") + logger.info(f" Delivery stats: {delivery_success} successful, {delivery_failed} failed") + return count + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + logger.warning(f"āš ļø Data not available yet: {url} (404)") + return 0 + else: + logger.error(f"āŒ HTTP error processing {url}: {e}") + raise + except Exception as e: + logger.error(f"āŒ Error processing {url}: {e}", exc_info=True) + raise + finally: + if os.path.exists(file_name): + os.remove(file_name) + + +def run_continuous_producer(): + """Main loop for continuously running producer""" + logger.info("=" * 60) + logger.info("šŸš€ Starting Continuous GH Archive Producer") + logger.info("=" * 60) + + # Setup Kafka (initialize outside loop to avoid repeated creation) + logger.info("šŸ“” Initializing Kafka producer...") + logger.info(f" Bootstrap servers: {KAFKA_BOOTSTRAP_SERVERS}") + logger.info(f" Topic: {TOPIC_NAME}") + logger.info(f" Schema Registry: {SCHEMA_REGISTRY_URL}") + + try: + schema_registry_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) + logger.info("āœ… Schema Registry client created") + avro_serializer = AvroSerializer(schema_registry_client, SCHEMA_STR) + logger.info("āœ… Avro serializer created") + except Exception as e: + logger.error(f"āŒ Failed to initialize Schema Registry: {e}", exc_info=True) + raise + + # Configure Producer: increase queue size for higher throughput + producer_config = { + 'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS, + 'queue.buffering.max.messages': 100000, # Increase queue size + 'queue.buffering.max.kbytes': 1048576, # 1GB + 'batch.num.messages': 10000, # Batch size + 'log.connection.close': True, # Log connection close events + } + + # Add debug configuration if specified + if KAFKA_DEBUG: + producer_config['debug'] = KAFKA_DEBUG + logger.info(f"šŸ” Kafka debug enabled: {KAFKA_DEBUG}") + + try: + producer = Producer(producer_config) + logger.info("āœ… Kafka producer created") + + # Test connection + logger.info("šŸ”Œ Testing Kafka connection...") + metadata = producer.list_topics(timeout=10) + logger.info(f"āœ… Connected to Kafka. Available topics: {len(metadata.topics)}") + if TOPIC_NAME in metadata.topics: + topic_metadata = metadata.topics[TOPIC_NAME] + logger.info(f"āœ… Topic '{TOPIC_NAME}' exists with {len(topic_metadata.partitions)} partitions") + else: + logger.warning(f"āš ļø Topic '{TOPIC_NAME}' not found! It may be created automatically.") + except Exception as e: + logger.error(f"āŒ Failed to connect to Kafka: {e}", exc_info=True) + raise + + # Use variable to track last processed hour (stateless operation) + last_processed_hour = None + + # Determine start time + if last_processed_hour is None: + # First run, start from 3 days ago + current_hour = get_initial_start_hour() + logger.info(f"šŸ†• Starting from {format_hour_key(current_hour)} (3 days ago)") + else: + # Resume from last processed time point + current_hour = last_processed_hour + timedelta(hours=1) + logger.info(f"šŸ”„ Resuming from {format_hour_key(current_hour)}") + + cycle_count = 0 + while True: + cycle_count += 1 + logger.info("") + logger.info("=" * 60) + logger.info(f"šŸ“Š Cycle #{cycle_count} - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}") + logger.info("=" * 60) + + target_hour = get_target_hour() + logger.info(f"šŸŽÆ Target hour: {format_hour_key(target_hour)} (3 hours ago)") + logger.info(f"šŸ“ Current hour: {format_hour_key(current_hour)}") + + # Process all hours that need processing + processed_in_cycle = 0 + while current_hour < target_hour: + date_str = current_hour.strftime("%Y-%m-%d") + hour_str = str(current_hour.hour) # Single digit hour (0-23) + + try: + count = process_hour(date_str, hour_str, producer, avro_serializer) + if count > 0: + processed_in_cycle += 1 + # After successful processing, update variable and move to next hour + last_processed_hour = current_hour + current_hour += timedelta(hours=1) + else: + # Data not available (possibly 404), wait before retrying + logger.info(f"ā³ Waiting 60 seconds before retrying...") + time.sleep(60) + # Don't update current_hour, will retry in next cycle + break + + except Exception as e: + logger.error(f"āŒ Failed to process {format_hour_key(current_hour)}: {e}", exc_info=True) + logger.info(f"ā³ Waiting 60 seconds before retrying...") + time.sleep(60) + # Don't update current_hour, will retry in next cycle + break + + if processed_in_cycle > 0: + logger.info("") + logger.info(f"āœ… Cycle completed: Processed {processed_in_cycle} hours") + else: + logger.info("") + logger.info(f"āøļø No new data to process (caught up to {format_hour_key(target_hour)})") + + # Wait before next check (target time advances with current time) + wait_seconds = 300 # 5 minutes + logger.info(f"ā³ Waiting {wait_seconds} seconds before next cycle...") + time.sleep(wait_seconds) + +if __name__ == "__main__": + # Continuous mode: start from 3 days ago, continuously pull up to 3 hours ago + try: + run_continuous_producer() + except KeyboardInterrupt: + logger.info("\n\nāš ļø Producer stopped by user") + except Exception as e: + logger.error(f"\n\nāŒ Fatal error: {e}", exc_info=True) + raise \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc b/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc new file mode 100644 index 0000000..c2aa611 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/github_event.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "GithubEvent", + "namespace": "com.gharchive", + "doc": "Top-level structure of Github Event", + "fields": [ + { "name": "id", "type": ["null", "string"], "default": null }, + { "name": "type", "type": ["null", "string"], "default": null }, + { "name": "actor_login", "type": ["null", "string"], "default": null }, + { "name": "repo_name", "type": ["null", "string"], "default": null }, + { "name": "org_login", "type": ["null", "string"], "default": null }, + { "name": "created_at", "type": { "type": "long", "logicalType": "timestamp-millis" } } + ] +} diff --git a/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt b/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt new file mode 100644 index 0000000..9ae67aa --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/event-producer/requirements.txt @@ -0,0 +1,5 @@ +requests +python-dateutil +confluent-kafka[schema-registry,avro] +attrs>=21.2.0 +fastavro diff --git a/byoc-examples/features/table-topic/github-event/justfile b/byoc-examples/features/table-topic/github-event/justfile new file mode 100644 index 0000000..e69de29 diff --git a/byoc-examples/features/table-topic/github-event/notebook/analysis.py b/byoc-examples/features/table-topic/github-event/notebook/analysis.py new file mode 100644 index 0000000..c0f391d --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/analysis.py @@ -0,0 +1,281 @@ +import marimo + +__generated_with = "0.18.0" +app = marimo.App(width="medium", app_title="GitHub Real-Time Analytics") + + +@app.cell(hide_code=True) +def _(): + # Import required libraries + import marimo as mo + import os + import awswrangler as wr + print("āœ“ Using awswrangler to query Athena") + + # Configure AWS credentials and region from environment variables + try: + print("Configuring AWS credentials...") + aws_region = os.getenv('AWS_REGION', 'us-east-1') + athena_workgroup = os.getenv('ATHENA_WORKGROUP', 'automqlab-k3f3-primary') + athena_database = os.getenv('ATHENA_DATABASE', 'event') + athena_s3_output = os.getenv('ATHENA_S3_OUTPUT', '') + + print(f"āœ“ AWS Region: {aws_region}") + print(f"āœ“ Athena Workgroup: {athena_workgroup}") + print(f"āœ“ Athena Database: {athena_database}") + if athena_s3_output: + print(f"āœ“ Athena S3 Output: {athena_s3_output}") + + # Test Athena connection + print("Testing Athena connection...") + # Set default session configuration + wr.config.s3_endpoint_url = None # Use default S3 endpoint + print("āœ“ awswrangler configured successfully") + + except Exception as e: + error_msg = f"""āŒ Error configuring awswrangler: {str(e)}""" + print(error_msg) + raise RuntimeError(error_msg) from e + + return mo, wr, aws_region, athena_workgroup, athena_database, athena_s3_output + + +@app.cell(hide_code=True) +def _(mo): + import time + + # State management + get_events, set_events = mo.state(value=[]) + get_last_update, set_last_update = mo.state(value="Never") + get_star_count, set_star_count = mo.state(value=0) + get_total_events, set_total_events = mo.state(value=0) + + return (get_events, set_events, get_last_update, set_last_update, get_star_count, set_star_count, get_total_events, set_total_events, time) + + +@app.cell +def _(mo): + mo.md(r""" + # šŸš€ GitHub Events Real-Time Analytics + + This project demonstrates how to leverage **[AutoMQ Table Topic](https://github.com/AutoMQ/automq)** to transform streaming GitHub events into Apache Iceberg format for real-time analytics. + + This solution ingests **GitHub Events** from [GH Archive](https://www.gharchive.org/) into AutoMQ, where the **Table Topic** feature automatically converts the streaming data into Apache Iceberg tables. AWS Athena can then query these tables directly using awswrangler, enabling real-time analysis of open-source community activities without the traditional ETL complexity. + + **Data Source**: [GH Archive](https://www.gharchive.org/) - Public GitHub timeline events + --- + """) + return + +@app.cell +def _(mo): + # Create auto-refresh component, refresh every 60 seconds (1 minute) + dataRefresh = mo.ui.refresh(options=["60s"], default_interval="60s") + return (dataRefresh,) + + +@app.cell(hide_code=True) +def _(dataRefresh, get_events, set_events, get_last_update, set_last_update, get_star_count, set_star_count, get_total_events, set_total_events, wr, athena_workgroup, athena_database, athena_s3_output, time): + # Use mo.ui.refresh to trigger data refresh + # Key: directly use dataRefresh.value to let marimo detect changes and trigger cell re-execution + # Use in SQL query comments to ensure SQL string changes when value changes, triggering re-execution + _refresh_value = dataRefresh.value + + try: + # Configure query parameters + query_params = { + 'database': athena_database, + 'workgroup': athena_workgroup, + } + if athena_s3_output: + query_params['s3_output'] = athena_s3_output + + # 1. Get star count (WatchEvent) for last 3 days + # Note: created_at is timestamp in milliseconds, convert to date for comparison + star_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT COUNT(*) as star_count + FROM {athena_database}.github_events_iceberg + WHERE type = 'WatchEvent' + AND from_unixtime(created_at / 1000) >= current_date - interval '3' day + """ + star_df = wr.athena.read_sql_query( + sql=star_query, + **query_params + ) + star_count = int(star_df.iloc[0]['star_count']) if len(star_df) > 0 else 0 + set_star_count(star_count) + + # 2. Get total event count for last 3 days + # Note: created_at is timestamp in milliseconds, convert to date for comparison + total_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT COUNT(*) as total_count + FROM {athena_database}.github_events_iceberg + WHERE from_unixtime(created_at / 1000) >= current_date - interval '3' day + """ + total_df = wr.athena.read_sql_query( + sql=total_query, + **query_params + ) + total_count = int(total_df.iloc[0]['total_count']) if len(total_df) > 0 else 0 + set_total_events(total_count) + + # 3. Get recent event list + # Convert created_at from milliseconds to readable timestamp + recent_query = f""" + -- Refresh trigger: {_refresh_value} + SELECT + id, + type, + actor_login, + repo_name, + from_unixtime(created_at / 1000) as created_at + FROM {athena_database}.github_events_iceberg + ORDER BY created_at DESC + LIMIT 20 + """ + _pandas_df = wr.athena.read_sql_query( + sql=recent_query, + **query_params + ) + + # Update state + set_events(_pandas_df) + + # Update last refresh time + current_time = time.strftime("%H:%M:%S") + set_last_update(current_time) + + print(f"šŸ”„ [Auto-refresh] Data updated at {current_time} - Stars: {star_count}, Total Events: {total_count}, Recent Events: {len(_pandas_df)}") + + except Exception as e: + print(f"āŒ [Auto-refresh] Error refreshing data: {e}") + import traceback + traceback.print_exc() + + # Return _refresh_value to ensure marimo detects changes + return _refresh_value + + +@app.cell(hide_code=True) +def _(wr, athena_workgroup, athena_database, athena_s3_output): + # Get top 10 repositories by star count for last 3 days (no auto-refresh) + try: + query_params = { + 'database': athena_database, + 'workgroup': athena_workgroup, + } + if athena_s3_output: + query_params['s3_output'] = athena_s3_output + + top_repos_query = f""" + SELECT + repo_name, + COUNT(*) as star_count + FROM {athena_database}.github_events_iceberg + WHERE type = 'WatchEvent' + AND from_unixtime(created_at / 1000) >= current_date - interval '3' day + GROUP BY repo_name + ORDER BY star_count DESC + LIMIT 10 + """ + top_repos_pandas = wr.athena.read_sql_query( + sql=top_repos_query, + **query_params + ) + if top_repos_pandas is not None and len(top_repos_pandas) > 0: + print(f"āœ“ Loaded top {len(top_repos_pandas)} repositories") + else: + print("āš ļø No repository data found") + except Exception as e: + print(f"āŒ Error fetching top repos: {e}") + import traceback + traceback.print_exc() + top_repos_pandas = None + + return top_repos_pandas + + +@app.cell +def _(dataRefresh, mo, get_last_update, get_star_count, get_total_events): + # Note: refresh component needs to be rendered to work, so render first then hide + # Or don't hide it, let users see the refresh status + dataRefresh.style({"display": None}) + + # First row: show star count on left, total events on right + stats_row = mo.hstack([ + mo.md(f""" + ### ⭐ Recent Stars (3 days) + **{get_star_count():,}** stars + """), + mo.md(f""" + ### šŸ“Š Total Events (3 days) + **{get_total_events():,}** events + """) + ], justify="space-between") + + mo.vstack([ + mo.md("## šŸ“Š Live GitHub Events Data"), + mo.md(f"*Last updated: {get_last_update()} • Auto-refresh every 60 seconds*"), + stats_row, + dataRefresh # Ensure refresh component is rendered (even if hidden) + ]) + + +@app.cell +def _(mo, top_repos_pandas): + # Display top 10 repositories by star count for last 3 days + if top_repos_pandas is not None and len(top_repos_pandas) > 0: + top_repos_table = mo.ui.table( + top_repos_pandas, + selection=None, + show_column_summaries=False + ) + result = mo.vstack([ + mo.md("### šŸ† Top 10 Repositories by Stars (Last 3 Days)"), + top_repos_table + ]) + else: + result = mo.vstack([ + mo.md("### šŸ† Top 10 Repositories by Stars (Last 3 Days)"), + mo.md("*No data available - Please check if there are WatchEvent records in the database*") + ]) + + result + + +@app.cell +def _(get_events, mo): + events_data = get_events() + # Select only specific columns to display (modify this list as needed) + display_columns = ['id', 'type', 'actor_login', 'repo_name', 'created_at'] + + + # Filter the DataFrame to show only selected columns + if len(events_data) > 0: + # Check which columns actually exist in the data + available_columns = [col for col in display_columns if col in events_data.columns] + filtered_data = events_data[available_columns] + + print(f"āœ“ Displaying {len(available_columns)} columns: {', '.join(available_columns)}") + else: + filtered_data = events_data + + # Create interactive table with filtered data + table = mo.ui.table( + filtered_data, + selection=None, + show_column_summaries=False + ) + + # Return the vstack as the final expression to display + mo.vstack([ + mo.md("### šŸ“‹ Recent GitHub Events"), + table + ]) + return + + +if __name__ == "__main__": + app.run() diff --git a/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh b/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh new file mode 100755 index 0000000..33b5d05 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/create-configmap.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Script to create ConfigMap from source files + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +NOTEBOOK_DIR="$(cd "${SCRIPT_DIR}" && pwd)" +CONFIGMAP_NAME="github-event-notebook-code" +NAMESPACE="default" + +echo "Creating ConfigMap ${CONFIGMAP_NAME} from source files..." +echo "Source directory: ${NOTEBOOK_DIR}" +echo "" + +# Create ConfigMap from source files +kubectl create configmap ${CONFIGMAP_NAME} \ + --from-file="${NOTEBOOK_DIR}/analysis.py" \ + --from-file="${NOTEBOOK_DIR}/requirements.txt" \ + --namespace=${NAMESPACE} \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "āœ… ConfigMap ${CONFIGMAP_NAME} created/updated successfully!" +echo "" +echo "To view the ConfigMap:" +echo " kubectl get configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" +echo "" +echo "To delete the ConfigMap:" +echo " kubectl delete configmap ${CONFIGMAP_NAME} -n ${NAMESPACE}" + diff --git a/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml b/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml new file mode 100644 index 0000000..6ca8fd4 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/deployment.yaml @@ -0,0 +1,82 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: github-event-notebook + namespace: default + labels: + app: github-event-notebook + component: notebook +spec: + replicas: 1 + selector: + matchLabels: + app: github-event-notebook + template: + metadata: + labels: + app: github-event-notebook + component: notebook + spec: + # Use producer node group if available + nodeSelector: + node-type: producer + containers: + - name: notebook + image: python:3.9 + command: + - sh + - -c + - | + echo "Installing Python dependencies..." + pip install --no-cache-dir -r /app/requirements.txt + echo "Starting marimo notebook server..." + marimo run /app/analysis.py --host 0.0.0.0 --port 8000 + ports: + - containerPort: 8000 + name: http + protocol: TCP + env: + - name: AWS_REGION + value: "us-east-1" # Default region, can be overridden + - name: ATHENA_WORKGROUP + value: "primary" # Default workgroup, can be overridden + - name: ATHENA_DATABASE + value: "event_data" # Database name from terraform + - name: ATHENA_S3_OUTPUT + value: "" # Will be set from terraform output if needed + - name: PYTHONUNBUFFERED + value: "1" + # AWS credentials will be provided via IAM role (IRSA) or environment variables + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "2000m" + volumeMounts: + - name: app-code + mountPath: /app + readOnly: true + livenessProbe: + httpGet: + path: / + port: 8000 + initialDelaySeconds: 60 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 3 + readinessProbe: + httpGet: + path: / + port: 8000 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + volumes: + - name: app-code + configMap: + name: github-event-notebook-code + restartPolicy: Always + diff --git a/byoc-examples/features/table-topic/github-event/notebook/requirements.txt b/byoc-examples/features/table-topic/github-event/notebook/requirements.txt new file mode 100644 index 0000000..dc5a7ed --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/requirements.txt @@ -0,0 +1,3 @@ +marimo +awswrangler +pandas diff --git a/byoc-examples/features/table-topic/github-event/notebook/service.yaml b/byoc-examples/features/table-topic/github-event/notebook/service.yaml new file mode 100644 index 0000000..8d2af75 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/notebook/service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: github-event-notebook + namespace: default + labels: + app: github-event-notebook + component: notebook +spec: + type: ClusterIP + ports: + - port: 80 + targetPort: 8000 + protocol: TCP + name: http + selector: + app: github-event-notebook + diff --git a/byoc-examples/features/table-topic/github-event/terraform/eks.tf b/byoc-examples/features/table-topic/github-event/terraform/eks.tf new file mode 100644 index 0000000..d5dd62f --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/eks.tf @@ -0,0 +1,37 @@ +module "eks-env" { + source = "../../../setup/kubernetes/aws/terraform" + + region = var.region + resource_suffix = local.resource_suffix + + node_group = var.node_group +} + +resource "aws_vpc_security_group_ingress_rule" "automq_console_ingress_rule" { + description = "Allow inbound traffic from security group of AutoMQ Console" + from_port = 0 + to_port = 65535 + ip_protocol = "tcp" + security_group_id = module.eks-env.eks_cluster_security_group + referenced_security_group_id = module.automq-byoc.automq_byoc_security_group_id + + depends_on = [module.automq-byoc, module.eks-env] +} + +resource "aws_eks_access_entry" "cluster_admins" { + cluster_name = module.eks-env.cluster_name + principal_arn = module.automq-byoc.automq_byoc_console_role_arn + kubernetes_groups = [] + type = "STANDARD" + depends_on = [module.automq-byoc, module.eks-env] +} + +resource "aws_eks_access_policy_association" "cluster_admins" { + cluster_name = module.eks-env.cluster_name + policy_arn = "arn:aws:eks::aws:cluster-access-policy/AmazonEKSClusterAdminPolicy" + principal_arn = module.automq-byoc.automq_byoc_console_role_arn + access_scope { + type = "cluster" + } + depends_on = [module.automq-byoc, module.eks-env] +} diff --git a/byoc-examples/features/table-topic/github-event/terraform/main.tf b/byoc-examples/features/table-topic/github-event/terraform/main.tf new file mode 100644 index 0000000..ae007aa --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/main.tf @@ -0,0 +1,12 @@ +module "automq-byoc" { + source = "AutoMQ/automq-byoc-environment/aws" + version = "0.3.2" + + cloud_provider_region = var.region + automq_byoc_env_id = local.resource_suffix + + create_new_vpc = false + automq_byoc_vpc_id = module.eks-env.vpc_id + automq_byoc_env_console_public_subnet_id = module.eks-env.public_subnets[0] +} + diff --git a/byoc-examples/features/table-topic/github-event/terraform/outputs.tf b/byoc-examples/features/table-topic/github-event/terraform/outputs.tf new file mode 100644 index 0000000..cd14273 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/outputs.tf @@ -0,0 +1,57 @@ +output "console_endpoint" { + description = "Console endpoint for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_endpoint +} + +output "initial_username" { + description = "Initial username for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_initial_username +} + +output "initial_password" { + description = "Initial password for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_initial_password +} + +output "dns_zone_id" { + description = "Route53 DNS Zone ID for the AutoMQ BYOC environment" + value = module.automq-byoc.automq_byoc_vpc_route53_zone_id +} + +output "data_bucket" { + description = "Data bucket name for the AutoMQ BYOC environment" + value = "automq-data-${module.automq-byoc.automq_byoc_env_id}" +} + +output "cluster_name" { + description = "Name of the EKS cluster" + value = module.eks-env.cluster_name +} + +output "region" { + description = "AWS region where resources are deployed" + value = var.region +} + +output "vpc_id" { + description = "VPC ID used by the EKS environment" + value = module.eks-env.vpc_id +} + +output "default_az" { + description = "Selected availability zone (first private subnet AZ)" + value = module.eks-env.azs[0] + +} + +output "automq_environment_id" { + description = "AutoMQ Environment ID used for BYOC" + value = module.automq-byoc.automq_byoc_env_id +} + +output "node_group_instance_profile_arn" { + description = "ARN of the EKS Node Group" + value = module.eks-env.node_group_instance_profile_arn +} + + diff --git a/byoc-examples/features/table-topic/github-event/terraform/producer.tf b/byoc-examples/features/table-topic/github-event/terraform/producer.tf new file mode 100644 index 0000000..d607218 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/producer.tf @@ -0,0 +1,39 @@ +# Producer Node Group +resource "aws_eks_node_group" "producer_node_group" { + count = 1 + cluster_name = module.eks-env.cluster_name + node_group_name = "producer-node-group-${local.resource_suffix}" + node_role_arn = module.eks-env.node_role_arn + + # Use the same subnet as the default node group (single AZ for cost optimization) + subnet_ids = slice(module.eks-env.private_subnets, 0, 1) + + scaling_config { + desired_size = var.producer_desired_size + max_size = var.producer_max_size + min_size = var.producer_min_size + } + + capacity_type = var.producer_capacity_type + instance_types = var.producer_instance_types + ami_type = var.producer_ami_type + disk_size = var.producer_disk_size + + labels = merge( + { + "node-type" = "producer" + "workload-type" = "producer" + } + ) + + tags = merge( + { + Name = "producer-node-group-${local.resource_suffix}" + } + ) + + # Ensure that IAM Role permissions are created before and deleted after EKS Node Group handling. + depends_on = [ + module.eks-env + ] +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/providers.tf b/byoc-examples/features/table-topic/github-event/terraform/providers.tf new file mode 100644 index 0000000..4aa29dd --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/providers.tf @@ -0,0 +1,62 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = " > 5.0.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.5.0" + } + helm = { + source = "hashicorp/helm" + version = "~> 2.0" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = "~> 2.0" + } + } +} + +# Configure AWS Provider +provider "aws" { + region = var.region +} + +# Configure Kubernetes provider to connect to the EKS cluster +provider "kubernetes" { + host = module.eks-env.eks_cluster_endpoint + cluster_ca_certificate = base64decode(module.eks-env.eks_cluster_ca_certificate) + + # Use AWS CLI to obtain EKS token dynamically + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + args = [ + "eks", + "get-token", + "--cluster-name", + module.eks-env.cluster_name, + ] + } +} + +# Configure Helm provider using the same Kubernetes connection +provider "helm" { + kubernetes { + host = module.eks-env.eks_cluster_endpoint + cluster_ca_certificate = base64decode(module.eks-env.eks_cluster_ca_certificate) + + exec { + api_version = "client.authentication.k8s.io/v1beta1" + command = "aws" + args = [ + "eks", + "get-token", + "--cluster-name", + module.eks-env.cluster_name, + ] + } + } +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf new file mode 100644 index 0000000..7874366 --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf @@ -0,0 +1,30 @@ +resource "aws_s3tables_table_bucket" "event_table_bucket" { + name = "${local.resource_suffix}-event-analytics-bucket" +} + +resource "aws_s3tables_namespace" "event_namespace" { + table_bucket_arn = aws_s3tables_table_bucket.event_table_bucket.arn + namespace = "event_data" +} + +output "s3_table_bucket_arn" { + description = "S3 Table Bucket ARN" + value = aws_s3tables_table_bucket.event_table_bucket.arn +} + +resource "aws_s3_bucket" "athena_results" { + bucket = "${local.resource_suffix}-athena-query-results" + force_destroy = true +} + + +resource "aws_athena_workgroup" "lab-athena-workgroup" { + name = "${local.resource_suffix}-primary" # č¦†ē›–é»˜č®¤ēš„ primary å·„ä½œē»„ + force_destroy = true + + configuration { + result_configuration { + output_location = "s3://${aws_s3_bucket.athena_results.bucket}/output/" + } + } +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/github-event/terraform/variables.tf b/byoc-examples/features/table-topic/github-event/terraform/variables.tf new file mode 100644 index 0000000..294a43e --- /dev/null +++ b/byoc-examples/features/table-topic/github-event/terraform/variables.tf @@ -0,0 +1,96 @@ + +variable "region" { + description = "AWS region" + type = string + default = "us-east-1" +} + +variable "resource_suffix" { + description = "Suffix for resource names" + type = string + default = "automqlab" +} + +resource "random_string" "resource_suffix" { + length = 4 + upper = false + lower = true + numeric = true + special = false +} + +locals { + # Append a dash and a 4-char random tail to the configured suffix + resource_suffix = "${var.resource_suffix}-${random_string.resource_suffix.result}" +} + +variable "node_group" { + description = "Configuration for EKS node group" + type = object({ + name = string + ami_type = string + instance_type = string + desired_size = number + max_size = number + min_size = number + }) + default = { + name = "automq-node-group" + desired_size = 4 # Desired number of nodes + max_size = 10 # Maximum number of nodes + min_size = 3 # Minimum number of nodes + instance_type = "c6g.2xlarge" # Compute-optimized instance with AWS Graviton2 processor + ami_type = "AL2_ARM_64" # Amazon Linux 2 AMI type, can use AL2_ARM_64 for ARM architecture + } +} + + +# Producer Node Group Configuration +variable "enable_producer_nodes" { + description = "Whether to create producer node group" + type = bool + default = true +} + +# Producer node group scaling configuration +variable "producer_capacity_type" { + description = "Type of capacity associated with the producer EKS Node Group. Valid values: ON_DEMAND, SPOT" + type = string + default = "ON_DEMAND" +} + +variable "producer_instance_types" { + description = "List of instance types for the producer node group - configured for at least 4c8g" + type = list(string) + default = ["c5.xlarge", "c5a.xlarge", "c5n.xlarge", "m5.xlarge", "m5a.xlarge"] +} + +variable "producer_desired_size" { + description = "Desired number of producer nodes" + type = number + default = 1 +} + +variable "producer_max_size" { + description = "Maximum number of producer nodes" + type = number + default = 2 +} + +variable "producer_min_size" { + description = "Minimum number of producer nodes" + type = number + default = 1 +} + +variable "producer_ami_type" { + description = "Type of Amazon Machine Image (AMI) associated with the producer EKS Node Group" + type = string + default = "AL2023_x86_64_STANDARD" +} + +variable "producer_disk_size" { + description = "Disk size in GiB for producer worker nodes" + type = number + default = 50 +} diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf b/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf new file mode 100644 index 0000000..2e6cec9 --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/eks.tf @@ -0,0 +1,6 @@ +module "eks-env" { + source = "../../../../setup/kubernetes/aws/terraform" + + region = var.region + resource_suffix = local.resource_suffix +} diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf b/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf new file mode 100644 index 0000000..b0364cf --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/providers.tf @@ -0,0 +1,25 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = " > 5.0.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.5.0" + } + helm = { + source = "hashicorp/helm" + version = "~> 2.0" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = "~> 2.0" + } + } +} + +# Configure AWS Provider +provider "aws" { + region = var.region +} \ No newline at end of file diff --git a/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf b/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf new file mode 100644 index 0000000..f87a911 --- /dev/null +++ b/byoc-examples/features/table-topic/policy-eks/terraform/variables.tf @@ -0,0 +1,76 @@ + +variable "region" { + description = "AWS region" + type = string + default = "us-east-1" +} + +variable "resource_suffix" { + description = "Suffix for resource names" + type = string + default = "automqlab" +} + +resource "random_string" "resource_suffix" { + length = 4 + upper = false + lower = true + numeric = true + special = false +} + +locals { + # Append a dash and a 4-char random tail to the configured suffix + resource_suffix = "${var.resource_suffix}-${random_string.resource_suffix.result}" +} + + +# Producer Node Group Configuration +variable "enable_producer_nodes" { + description = "Whether to create producer node group" + type = bool + default = true +} + +# Producer node group scaling configuration +variable "producer_capacity_type" { + description = "Type of capacity associated with the producer EKS Node Group. Valid values: ON_DEMAND, SPOT" + type = string + default = "ON_DEMAND" +} + +variable "producer_instance_types" { + description = "List of instance types for the producer node group - configured for at least 4c8g" + type = list(string) + default = ["c5.xlarge", "c5a.xlarge", "c5n.xlarge", "m5.xlarge", "m5a.xlarge"] +} + +variable "producer_desired_size" { + description = "Desired number of producer nodes" + type = number + default = 1 +} + +variable "producer_max_size" { + description = "Maximum number of producer nodes" + type = number + default = 2 +} + +variable "producer_min_size" { + description = "Minimum number of producer nodes" + type = number + default = 1 +} + +variable "producer_ami_type" { + description = "Type of Amazon Machine Image (AMI) associated with the producer EKS Node Group" + type = string + default = "AL2023_x86_64_STANDARD" +} + +variable "producer_disk_size" { + description = "Disk size in GiB for producer worker nodes" + type = number + default = 50 +} diff --git a/byoc-examples/setup/kubernetes/aws/terraform/main.tf b/byoc-examples/setup/kubernetes/aws/terraform/main.tf index 60efdd4..da9789a 100644 --- a/byoc-examples/setup/kubernetes/aws/terraform/main.tf +++ b/byoc-examples/setup/kubernetes/aws/terraform/main.tf @@ -54,7 +54,7 @@ resource "aws_eks_node_group" "automq-node-groups" { subnet_ids = slice(module.network.private_subnets, 0, 1) ami_type = local.node_group.ami_type - capacity_type = "ON_DEMAND" # Use On-Demand instances, can switch to "SPOT" for cost savings + capacity_type = "SPOT" # Use On-Demand instances, can switch to "SPOT" for cost savings instance_types = [local.node_group.instance_type] # Node group auto-scaling configuration From df42da258aeb6e8e56ce9741b6e32a08cb7a5360 Mon Sep 17 00:00:00 2001 From: lyx2000 <1419360299@qq.com> Date: Wed, 17 Dec 2025 11:50:21 +0800 Subject: [PATCH 4/5] Optimize the event producer logic --- .../table-topic/github-event/event-producer/event_producer.py | 2 ++ .../features/table-topic/github-event/terraform/eks.tf | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py index c6d1009..82a3f42 100644 --- a/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py +++ b/byoc-examples/features/table-topic/github-event/event-producer/event_producer.py @@ -145,6 +145,8 @@ def delivery_report(err, msg): # Call poll every 100 messages to ensure timely delivery if count % 100 == 0: producer.poll(0) + # Sleep to spread events evenly: every 100 events sleep 3.6s (100k events in 1 hour) + time.sleep(3.6) # Print progress every 2000 messages if count % 2000 == 0: logger.info(f" ↳ Sent {count} events...") diff --git a/byoc-examples/features/table-topic/github-event/terraform/eks.tf b/byoc-examples/features/table-topic/github-event/terraform/eks.tf index d5dd62f..4551e15 100644 --- a/byoc-examples/features/table-topic/github-event/terraform/eks.tf +++ b/byoc-examples/features/table-topic/github-event/terraform/eks.tf @@ -1,5 +1,5 @@ module "eks-env" { - source = "../../../setup/kubernetes/aws/terraform" + source = "../../../../setup/kubernetes/aws/terraform" region = var.region resource_suffix = local.resource_suffix From 6c12aae157551ea4d991b1c2d5f82ecb22fba8dd Mon Sep 17 00:00:00 2001 From: lyx2000 <1419360299@qq.com> Date: Wed, 17 Dec 2025 11:53:28 +0800 Subject: [PATCH 5/5] fmt terraform code --- .../features/table-topic/github-event/terraform/s3_table.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf index 7874366..4316254 100644 --- a/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf +++ b/byoc-examples/features/table-topic/github-event/terraform/s3_table.tf @@ -13,13 +13,13 @@ output "s3_table_bucket_arn" { } resource "aws_s3_bucket" "athena_results" { - bucket = "${local.resource_suffix}-athena-query-results" + bucket = "${local.resource_suffix}-athena-query-results" force_destroy = true } resource "aws_athena_workgroup" "lab-athena-workgroup" { - name = "${local.resource_suffix}-primary" # č¦†ē›–é»˜č®¤ēš„ primary å·„ä½œē»„ + name = "${local.resource_suffix}-primary" # č¦†ē›–é»˜č®¤ēš„ primary å·„ä½œē»„ force_destroy = true configuration {