save sessions
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"error": "MCP tool 'get_vulnerability_details' must be requested with its server prefix (e.g., 'osvScanner__get_vulnerability_details') in agent 'generalist'."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Directory listing for /Users/bob/git/dxw/aws-stuff:\n[DIR] .ruff_cache\n[DIR] tests\n.env.example (451 bytes)\n.gitignore (76 bytes)\navg_bandwidth_12months.py (4145 bytes)\naws_bills_breakdown.py (3184 bytes)\naws_bills_refined.py (2450 bytes)\naws_bills.py (2964 bytes)\naws_cost.py (2649 bytes)\naws_utils.py (10760 bytes)\naws-asg-lt-ami.py (6715 bytes)\naws-carbon.py (6537 bytes)\naws-ec2-ami.py (4290 bytes)\naws-ips.py (5612 bytes)\naws-metrics.py (5042 bytes)\naws-vpc.py (3098 bytes)\nbwsuage.py (3116 bytes)\ncloudfront.py (3073 bytes)\ncloudtrail-audit.py (2940 bytes)\ndownload_invoice.py (3168 bytes)\nec2_price.py (1491 bytes)\nextract_billing_api.py (3574 bytes)\nextract_invoice_charges.py (5401 bytes)\nGEMINI.md (2922 bytes)\ngenerate-billing-sheets.py (6047 bytes)\niam-audit.py (4855 bytes)\ninspect_billing.py (1398 bytes)\ninspect_mtvh.py (1981 bytes)\nlast-2-months-billing.py (2367 bytes)\nlist_rds_instances.py (1490 bytes)\nlist_services.py (671 bytes)\nnot_running_latest.py (1761 bytes)\nrds_price.py (1396 bytes)\nREADME.md (3830 bytes)\nrequirements.txt (116 bytes)\ns3-security-audit.py (5162 bytes)\nsecurity-group-audit.py (3866 bytes)\nservice-spend-by-account.py (2449 bytes)\nstorage-usage.py (3796 bytes)\nsummarize-ecsv.py (3834 bytes)\ntest_invoice_api.py (2954 bytes)\ntest.py (1985 bytes)\n\n(31 ignored)"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Directory listing for /Users/bob/git/dxw/aws-stuff/tests:\n__init__.py (0 bytes)\ntest_aws_utils.py (7304 bytes)\ntest_scripts.py (2279 bytes)"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nfrom aws_utils import get_account_names, get_last_n_months_ranges, get_boto_session\n\n\ndef get_cost_for_range(ce_client, start_date, end_date, account_costs):\n \"\"\"Fetches costs for a date range and updates account_costs dictionary.\"\"\"\n try:\n response = ce_client.get_cost_and_usage(\n TimePeriod={\"Start\": start_date, \"End\": end_date},\n Granularity=\"MONTHLY\",\n Metrics=[\"UnblendedCost\", \"AmortizedCost\"],\n GroupBy=[{\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"}],\n )\n\n for result in response[\"ResultsByTime\"]:\n for group in result[\"Groups\"]:\n account_id = group[\"Keys\"][0]\n unblended = float(group[\"Metrics\"][\"UnblendedCost\"][\"Amount\"])\n amortized = float(group[\"Metrics\"][\"AmortizedCost\"][\"Amount\"])\n\n if account_id not in account_costs:\n account_costs[account_id] = {\"unblended\": [], \"amortized\": []}\n\n account_costs[account_id][\"unblended\"].append(unblended)\n account_costs[account_id][\"amortized\"].append(amortized)\n except Exception as e:\n print(f\"Error fetching cost for range {start_date} to {end_date}: {e}\")\n\n\ndef main():\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n account_names = get_account_names()\n account_costs = {}\n\n ranges = get_last_n_months_ranges(3)\n months_labels = [r[2] for r in ranges]\n\n for start_date, end_date, _ in ranges:\n get_cost_for_range(ce_client, start_date, end_date, account_costs)\n\n # Sort accounts by the most recent month's amortized cost\n sorted_accounts = sorted(\n account_costs.items(),\n key=lambda item: item[1][\"amortized\"][0] if item[1][\"amortized\"] else 0,\n reverse=True,\n )\n\n print(\n f\"\\nAWS Costs for the last 3 months (Amortized and Unblended, sorted by highest Amortized Cost in {months_labels[0]}):\"\n )\n header = f\"{'Account Name':<30} {' ' + months_labels[0] + ' (Amort)':<15} {' ' + months_labels[1] + ' (Amort)':<15} {' ' + months_labels[2] + ' (Amort)':<15} {' ' + months_labels[0] + ' (Unbl)':<15} {' ' + months_labels[1] + ' (Unbl)':<15} {' ' + months_labels[2] + ' (Unbl)':<15}\"\n print(header)\n print(\"-\" * 125)\n\n for account_id, costs in sorted_accounts:\n name = account_names.get(account_id, f\"Unknown ({account_id})\")\n\n # Ensure we have 3 months of data\n amortized = costs[\"amortized\"] + [0.0] * (3 - len(costs[\"amortized\"]))\n unblended = costs[\"unblended\"] + [0.0] * (3 - len(costs[\"unblended\"]))\n\n amortized_fmt = [f\"${c:,.2f}\" for c in amortized[:3]]\n unblended_fmt = [f\"${c:,.2f}\" for c in unblended[:3]]\n\n print(\n f\"{name[:30]:<30} {amortized_fmt[0]:>15} {amortized_fmt[1]:>15} {amortized_fmt[2]:>15} {unblended_fmt[0]:>15} {unblended_fmt[1]:>15} {unblended_fmt[2]:>15}\"\n )\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import unittest\nfrom unittest.mock import patch, MagicMock\nimport os\nimport sys\nimport importlib.util\n\n# Add project root to path\nPROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\"))\nsys.path.append(PROJECT_ROOT)\n\n\nclass TestScripts(unittest.TestCase):\n def setUp(self):\n # Patch get_boto_session for all tests to avoid real AWS calls\n self.patcher = patch(\"aws_utils.get_boto_session\")\n self.mock_session = self.patcher.start()\n self.mock_client = MagicMock()\n self.mock_session.return_value.client.return_value = self.mock_client\n\n def tearDown(self):\n self.patcher.stop()\n\n def test_import_bwsuage(self):\n \"\"\"Smoke test to ensure bwsuage.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"bwsuage\", os.path.join(PROJECT_ROOT, \"bwsuage.py\")\n )\n bwsuage = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(bwsuage)\n self.assertTrue(hasattr(bwsuage, \"get_total_bandwidth_report\"))\n\n def test_import_aws_bills(self):\n \"\"\"Smoke test to ensure aws_bills.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_bills\", os.path.join(PROJECT_ROOT, \"aws_bills.py\")\n )\n aws_bills = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_bills)\n self.assertTrue(hasattr(aws_bills, \"get_cost_for_range\"))\n\n def test_import_aws_cost(self):\n \"\"\"Smoke test to ensure aws_cost.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_cost\", os.path.join(PROJECT_ROOT, \"aws_cost.py\")\n )\n aws_cost = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_cost)\n self.assertTrue(hasattr(aws_cost, \"fetch_costs_with_savings\"))\n\n def test_import_aws_vpc(self):\n \"\"\"Smoke test to ensure aws-vpc.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_vpc\", os.path.join(PROJECT_ROOT, \"aws-vpc.py\")\n )\n aws_vpc = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_vpc)\n self.assertTrue(hasattr(aws_vpc, \"list_vpcs\"))\n\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nimport csv\nfrom botocore.exceptions import ClientError, ProfileNotFound\nfrom aws_utils import setup_org_accounts_session, get_boto_session, export_to_sheets\n\n\ndef get_vpc_name(tags):\n if tags:\n for tag in tags:\n if tag[\"Key\"] == \"Name\":\n return tag[\"Value\"]\n return \"Unnamed VPC\"\n\n\ndef list_vpcs(ec2_client):\n vpcs_info = []\n paginator = ec2_client.get_paginator(\"describe_vpcs\")\n for page in paginator.paginate():\n for vpc in page[\"Vpcs\"]:\n vpcs_info.append(\n {\n \"VPCName\": get_vpc_name(vpc.get(\"Tags\")),\n \"VPCID\": vpc[\"VpcId\"],\n \"CIDRBlock\": vpc.get(\"CidrBlock\", \"N/A\"),\n }\n )\n return vpcs_info\n\n\ndef main():\n get_boto_session()\n regions_to_check = [\"eu-west-1\", \"eu-west-2\"]\n total_vpcs_all_accounts = 0\n rows_for_sheets = []\n\n with open(\"aws_vpcs_by_region.csv\", mode=\"w\", newline=\"\") as csvfile:\n fieldnames = [\n \"AccountName\",\n \"AccountID\",\n \"Region\",\n \"VPCName\",\n \"VPCID\",\n \"CIDRBlock\",\n ]\n writer = csv.DictWriter(csvfile, fieldnames=fieldnames)\n writer.writeheader()\n\n for account, profile_name in setup_org_accounts_session():\n account_id = account[\"Id\"]\n account_name = account[\"Name\"]\n try:\n boto3.setup_default_session(profile_name=profile_name)\n for region in regions_to_check:\n try:\n ec2_client = boto3.client(\"ec2\", region_name=region)\n vpcs_info = list_vpcs(ec2_client)\n total_vpcs_all_accounts += len(vpcs_info)\n\n for vpc in vpcs_info:\n row = {\n \"AccountName\": account_name,\n \"AccountID\": account_id,\n \"Region\": region,\n \"VPCName\": vpc[\"VPCName\"],\n \"VPCID\": vpc[\"VPCID\"],\n \"CIDRBlock\": vpc[\"CIDRBlock\"],\n }\n writer.writerow(row)\n rows_for_sheets.append(list(row.values()))\n\n if vpcs_info:\n print(\n f'\"{account_name}\" (ID: {account_id}, Region: {region}) VPCs found: {len(vpcs_info)}'\n )\n except ClientError as e:\n print(f\"Error in {region} for {account_name}: {e}\")\n except (ClientError, ProfileNotFound) as e:\n print(f\"Error accessing account {account_name}: {e}\")\n\n print(f\"\\nTotal number of VPCs found: {total_vpcs_all_accounts}\")\n print(\"CSV file 'aws_vpcs_by_region.csv' generated successfully.\")\n\n # Export to Google Sheets\n export_to_sheets(\"aws-vpc\", fieldnames, rows_for_sheets)\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "boto3\npandas\npyarrow\npython-dotenv\npython-dateutil\ngspread\ngoogle-auth-oauthlib\ngoogle-auth-httplib2\npypdf\nrequests\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nfrom aws_utils import get_account_names, get_previous_month_range, get_boto_session\n\n\ndef fetch_costs_with_savings(ce_client, org_client, start_date, end_date):\n \"\"\"Fetches costs for EC2 grouped by account and service.\"\"\"\n account_map = get_account_names()\n\n try:\n response = ce_client.get_cost_and_usage(\n TimePeriod={\"Start\": start_date, \"End\": end_date},\n Granularity=\"MONTHLY\",\n Metrics=[\n \"UnblendedCost\",\n \"AmortizedCost\",\n \"NetAmortizedCost\",\n \"NetUnblendedCost\",\n ],\n GroupBy=[\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"SERVICE\"},\n ],\n )\n\n if not response[\"ResultsByTime\"]:\n print(\"No cost data available for the specified period.\")\n return\n\n for result in response[\"ResultsByTime\"]:\n period_start = result[\"TimePeriod\"][\"Start\"]\n print(f\"Billing Period: {period_start}\")\n\n for group in result[\"Groups\"]:\n account_id = group[\"Keys\"][0]\n service = group[\"Keys\"][1]\n\n if \"Amazon Elastic Compute Cloud\" in service:\n metrics = group[\"Metrics\"]\n unblended = metrics[\"UnblendedCost\"][\"Amount\"]\n amortized = metrics[\"AmortizedCost\"][\"Amount\"]\n net_amortized = metrics[\"NetAmortizedCost\"][\"Amount\"]\n net_unblended = metrics[\"NetUnblendedCost\"][\"Amount\"]\n unit = metrics[\"UnblendedCost\"][\"Unit\"]\n\n account_name = account_map.get(account_id, account_id)\n\n print(f\"Account: {account_name} | Service: {service}\")\n print(f\" - Total Cost (Unblended): {unblended} {unit}\")\n print(f\" - Net Cost (Amortized): {net_amortized} {unit}\")\n print(f\" - Net Cost (Unblended): {net_unblended} {unit}\")\n print(f\" - Final Cost (Amortized): {amortized} {unit}\")\n\n except Exception as e:\n print(f\"Error fetching costs: {e}\")\n\n\ndef main():\n session = get_boto_session()\n ce_client = session.client(\"ce\", region_name=\"us-east-1\")\n org_client = session.client(\"organizations\", region_name=\"us-east-1\")\n\n start_date, end_date = get_previous_month_range()\n\n print(\n f\"Fetching AWS EC2 costs with compute savings from {start_date} to {end_date}...\"\n )\n fetch_costs_with_savings(ce_client, org_client, start_date, end_date)\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nfrom aws_utils import get_account_names, get_last_n_months_ranges, get_boto_session\n\n\ndef get_cost_for_range(ce_client, start_date, end_date, account_costs):\n \"\"\"Fetches costs for a date range and updates account_costs dictionary.\"\"\"\n try:\n response = ce_client.get_cost_and_usage(\n TimePeriod={\"Start\": start_date, \"End\": end_date},\n Granularity=\"MONTHLY\",\n Metrics=[\"UnblendedCost\", \"AmortizedCost\"],\n GroupBy=[{\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"}],\n )\n\n for result in response[\"ResultsByTime\"]:\n for group in result[\"Groups\"]:\n account_id = group[\"Keys\"][0]\n unblended = float(group[\"Metrics\"][\"UnblendedCost\"][\"Amount\"])\n amortized = float(group[\"Metrics\"][\"AmortizedCost\"][\"Amount\"])\n\n if account_id not in account_costs:\n account_costs[account_id] = {\"unblended\": [], \"amortized\": []}\n\n account_costs[account_id][\"unblended\"].append(unblended)\n account_costs[account_id][\"amortized\"].append(amortized)\n except Exception as e:\n print(f\"Error fetching cost for range {start_date} to {end_date}: {e}\")\n\n\ndef main():\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n account_names = get_account_names()\n account_costs = {}\n\n ranges = get_last_n_months_ranges(3)\n months_labels = [r[2] for r in ranges]\n\n for start_date, end_date, _ in ranges:\n get_cost_for_range(ce_client, start_date, end_date, account_costs)\n\n # Sort accounts by the most recent month's amortized cost\n sorted_accounts = sorted(\n account_costs.items(),\n key=lambda item: item[1][\"amortized\"][0] if item[1][\"amortized\"] else 0,\n reverse=True,\n )\n\n print(\n f\"\\nAWS Costs for the last 3 months (Amortized and Unblended, sorted by highest Amortized Cost in {months_labels[0]}):\"\n )\n header = f\"{'Account Name':<30} {' ' + months_labels[0] + ' (Amort)':<15} {' ' + months_labels[1] + ' (Amort)':<15} {' ' + months_labels[2] + ' (Amort)':<15} {' ' + months_labels[0] + ' (Unbl)':<15} {' ' + months_labels[1] + ' (Unbl)':<15} {' ' + months_labels[2] + ' (Unbl)':<15}\"\n print(header)\n print(\"-\" * 125)\n\n for account_id, costs in sorted_accounts:\n name = account_names.get(account_id, f\"Unknown ({account_id})\")\n\n # Ensure we have 3 months of data\n amortized = costs[\"amortized\"] + [0.0] * (3 - len(costs[\"amortized\"]))\n unblended = costs[\"unblended\"] + [0.0] * (3 - len(costs[\"unblended\"]))\n\n amortized_fmt = [f\"${c:,.2f}\" for c in amortized[:3]]\n unblended_fmt = [f\"${c:,.2f}\" for c in unblended[:3]]\n\n print(\n f\"{name[:30]:<30} {amortized_fmt[0]:>15} {amortized_fmt[1]:>15} {amortized_fmt[2]:>15} {unblended_fmt[0]:>15} {unblended_fmt[1]:>15} {unblended_fmt[2]:>15}\"\n )\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nfrom aws_utils import (\n get_account_names,\n get_previous_month_range,\n get_boto_session,\n export_to_sheets,\n)\n\n\ndef get_total_bandwidth_report(threshold_gb=100.0):\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n all_accounts = get_account_names()\n if not all_accounts:\n return\n\n start_date, end_date = get_previous_month_range()\n account_totals = {acc_id: 0.0 for acc_id in all_accounts.keys()}\n next_token = None\n\n try:\n while True:\n params = {\n \"TimePeriod\": {\"Start\": start_date, \"End\": end_date},\n \"Granularity\": \"MONTHLY\",\n \"Metrics\": [\"UsageQuantity\"],\n \"GroupBy\": [\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"USAGE_TYPE\"},\n ],\n }\n if next_token:\n params[\"NextPageToken\"] = next_token\n\n response = ce_client.get_cost_and_usage(**params)\n\n for result in response[\"ResultsByTime\"]:\n for group in result[\"Groups\"]:\n acc_id = group[\"Keys\"][0]\n usage_type = group[\"Keys\"][1]\n usage_amount = float(group[\"Metrics\"][\"UsageQuantity\"][\"Amount\"])\n unit = group[\"Metrics\"][\"UsageQuantity\"][\"Unit\"]\n\n if \"DataTransfer\" in usage_type:\n if unit == \"Bytes\":\n usage_amount /= 1024**3\n elif unit in [\"MB\", \"Megabytes\"]:\n usage_amount /= 1024\n\n if acc_id in account_totals:\n account_totals[acc_id] += usage_amount\n else:\n account_totals[acc_id] = usage_amount\n\n next_token = response.get(\"NextPageToken\")\n if not next_token:\n break\n\n print(f\"\\n--- Full Bandwidth Report ({start_date} to {end_date}) ---\")\n print(f\"{'Account Name':<25} | {'Total GB':<12} | {'Overage (>100GB)'}\")\n print(\"-\" * 75)\n\n sorted_accounts = sorted(\n account_totals.items(),\n key=lambda x: (x[1], all_accounts.get(x[0], x[0])),\n reverse=True,\n )\n\n headers = [\"Account Name\", \"Account ID\", \"Total GB\", \"Overage (>100GB)\"]\n rows = []\n\n for acc_id, total_usage in sorted_accounts:\n acc_name = all_accounts.get(acc_id, f\"Deleted/Unknown ({acc_id})\")\n overage = max(0, total_usage - threshold_gb)\n status_icon = \"⚠️\" if overage > 0 else \"✅\"\n print(\n f\"{acc_name[:25]:<25} | {total_usage:>10.2f} GB | {overage:>8.2f} GB {status_icon}\"\n )\n\n rows.append([acc_name, acc_id, round(total_usage, 2), round(overage, 2)])\n\n # Export to Google Sheets\n export_to_sheets(\"AWS_Bandwidth_Usage\", headers, rows)\n\n except Exception as e:\n print(f\"Error: {e}\")\n\n\nif __name__ == \"__main__\":\n get_total_bandwidth_report(100.0)\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nimport calendar\nfrom datetime import datetime, timedelta\nfrom aws_utils import (\n get_account_names,\n get_previous_month_range,\n get_boto_session,\n export_to_sheets,\n)\n\n\ndef get_storage_report(threshold_gb=100.0):\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n all_accounts = get_account_names()\n start_date, end_date = get_previous_month_range()\n\n # Calculate hours in the previous month for Byte-Hour conversion\n dt_end = datetime.strptime(end_date, \"%Y-%m-%d\")\n last_day_prev = dt_end - timedelta(days=1)\n _, num_days = calendar.monthrange(last_day_prev.year, last_day_prev.month)\n hours_in_month = num_days * 24\n\n account_storage_totals = {acc_id: 0.0 for acc_id in all_accounts.keys()}\n next_token = None\n\n try:\n while True:\n params = {\n \"TimePeriod\": {\"Start\": start_date, \"End\": end_date},\n \"Granularity\": \"MONTHLY\",\n \"Metrics\": [\"UsageQuantity\"],\n \"GroupBy\": [\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"USAGE_TYPE\"},\n ],\n \"Filter\": {\n \"Dimensions\": {\n \"Key\": \"USAGE_TYPE_GROUP\",\n \"Values\": [\n \"S3: Storage - Standard\",\n \"S3: Storage - Infinite Archive\",\n \"EC2: EBS - Optimized Storage\",\n \"RDS: Storage\",\n \"EFS: Storage - Standard\",\n \"EFS: Storage - IA\",\n \"EFS: Storage - Archive\",\n ],\n }\n },\n }\n if next_token:\n params[\"NextPageToken\"] = next_token\n\n response = ce_client.get_cost_and_usage(**params)\n\n for result in response[\"ResultsByTime\"]:\n for group in result[\"Groups\"]:\n acc_id = group[\"Keys\"][0]\n usage_amount = float(group[\"Metrics\"][\"UsageQuantity\"][\"Amount\"])\n unit = group[\"Metrics\"][\"UsageQuantity\"][\"Unit\"]\n\n if unit == \"ByteHrs\":\n usage_amount = (usage_amount / (1024**3)) / hours_in_month\n elif unit in [\"MB-Mo\", \"Megabyte-Months\"]:\n usage_amount /= 1024\n\n if acc_id in account_storage_totals:\n account_storage_totals[acc_id] += usage_amount\n\n next_token = response.get(\"NextPageToken\")\n if not next_token:\n break\n\n print(f\"\\n--- Storage Report ({start_date} to {end_date}) ---\")\n print(f\"{'Account Name':<25} | {'Storage (GB)':<12} | {'Overage (>100GB)'}\")\n print(\"-\" * 75)\n\n rows = []\n sorted_accounts = sorted(\n account_storage_totals.items(), key=lambda x: x[1], reverse=True\n )\n for acc_id, total_gb in sorted_accounts:\n acc_name = all_accounts.get(acc_id, acc_id)\n overage = max(0, total_gb - threshold_gb)\n status_icon = \"⚠️\" if overage > 0 else \"✅\"\n print(\n f\"{acc_name[:25]:<25} | {total_gb:>10.2f} GB | {overage:>8.2f} GB {status_icon}\"\n )\n rows.append([acc_name, acc_id, round(total_gb, 2), round(overage, 2)])\n\n headers = [\n \"Account Name\",\n \"Account ID\",\n f\"Storage GB ({start_date})\",\n \"Overage (>100GB)\",\n ]\n export_to_sheets(\"AWS_Storage_Usage\", headers, rows)\n\n except Exception as e:\n print(f\"Error: {e}\")\n\n\nif __name__ == \"__main__\":\n get_storage_report(100.0)\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import unittest\nfrom unittest.mock import patch, MagicMock\nimport os\nimport sys\nimport importlib.util\n\n# Add project root to path\nPROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), \"..\"))\nsys.path.append(PROJECT_ROOT)\n\n\nclass TestScripts(unittest.TestCase):\n def setUp(self):\n # Patch get_boto_session for all tests to avoid real AWS calls\n self.patcher = patch(\"aws_utils.get_boto_session\")\n self.mock_session = self.patcher.start()\n self.mock_client = MagicMock()\n self.mock_session.return_value.client.return_value = self.mock_client\n\n def tearDown(self):\n self.patcher.stop()\n\n def test_import_bwsuage(self):\n \"\"\"Smoke test to ensure bwsuage.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"bwsuage\", os.path.join(PROJECT_ROOT, \"bwsuage.py\")\n )\n bwsuage = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(bwsuage)\n self.assertTrue(hasattr(bwsuage, \"get_total_bandwidth_report\"))\n\n def test_import_aws_bills(self):\n \"\"\"Smoke test to ensure aws_bills.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_bills\", os.path.join(PROJECT_ROOT, \"aws_bills.py\")\n )\n aws_bills = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_bills)\n self.assertTrue(hasattr(aws_bills, \"get_cost_for_range\"))\n\n def test_import_aws_cost(self):\n \"\"\"Smoke test to ensure aws_cost.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_cost\", os.path.join(PROJECT_ROOT, \"aws_cost.py\")\n )\n aws_cost = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_cost)\n self.assertTrue(hasattr(aws_cost, \"fetch_costs_with_savings\"))\n\n def test_import_aws_vpc(self):\n \"\"\"Smoke test to ensure aws-vpc.py can be imported.\"\"\"\n spec = importlib.util.spec_from_file_location(\n \"aws_vpc\", os.path.join(PROJECT_ROOT, \"aws-vpc.py\")\n )\n aws_vpc = importlib.util.module_from_spec(spec)\n spec.loader.exec_module(aws_vpc)\n self.assertTrue(hasattr(aws_vpc, \"list_vpcs\"))\n\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nfrom botocore.exceptions import ClientError, ProfileNotFound\nfrom aws_utils import setup_org_accounts_session\n\n\ndef list_rds_instances(rds_client):\n instance_names = []\n paginator = rds_client.get_paginator(\"describe_db_instances\")\n for page in paginator.paginate():\n for db_instance in page[\"DBInstances\"]:\n instance_names.append(db_instance[\"DBInstanceIdentifier\"])\n return instance_names\n\n\ndef list_rds_clusters(rds_client):\n cluster_names = []\n paginator = rds_client.get_paginator(\"describe_db_clusters\")\n for page in paginator.paginate():\n for db_cluster in page[\"DBClusters\"]:\n cluster_names.append(db_cluster[\"DBClusterIdentifier\"])\n return cluster_names\n\n\ndef main():\n for account, profile_name in setup_org_accounts_session():\n account_id = account[\"Id\"]\n account_name = account[\"Name\"]\n try:\n boto3.setup_default_session(profile_name=profile_name)\n rds_client = boto3.client(\"rds\")\n\n names = list_rds_instances(rds_client) + list_rds_clusters(rds_client)\n for name in names:\n print(f'\"{account_name}\":\"{name}\"')\n\n print(f\"Processed account '{account_name}' (ID: {account_id})\")\n except (ClientError, ProfileNotFound) as e:\n print(f\"Error accessing account '{account_name}': {e}\")\n\n print(\"\\nRDS names have been successfully listed for all accounts.\")\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "import boto3\nimport csv\nfrom botocore.exceptions import ClientError, ProfileNotFound\nfrom aws_utils import setup_org_accounts_session, get_boto_session, export_to_sheets\n\n\ndef get_cloudfront_details(account_id, account_name, cloudfront_client):\n \"\"\"\n Lists all CloudFront distributions for the account and determines\n their associated details.\n \"\"\"\n dist_info_list = []\n try:\n paginator = cloudfront_client.get_paginator(\"list_distributions\")\n for page in paginator.paginate():\n if \"DistributionList\" in page and \"Items\" in page[\"DistributionList\"]:\n for dist in page[\"DistributionList\"][\"Items\"]:\n dist_info_list.append(\n {\n \"AccountName\": account_name,\n \"AccountID\": account_id,\n \"ID\": dist[\"Id\"],\n \"DomainName\": dist[\"DomainName\"],\n \"Status\": dist[\"Status\"],\n \"Enabled\": dist[\"Enabled\"],\n \"Comment\": dist.get(\"Comment\", \"N/A\"),\n }\n )\n except ClientError as e:\n print(f\" -> Error listing distributions: {e}\")\n return dist_info_list\n\n\ndef main():\n get_boto_session()\n all_dist_info = []\n rows_for_sheets = []\n csv_filepath = \"cloudfront_distributions_info.csv\"\n fieldnames = [\n \"AccountName\",\n \"AccountID\",\n \"ID\",\n \"DomainName\",\n \"Status\",\n \"Enabled\",\n \"Comment\",\n ]\n\n with open(csv_filepath, mode=\"w\", newline=\"\") as csvfile:\n writer = csv.DictWriter(csvfile, fieldnames=fieldnames)\n writer.writeheader()\n\n print(\"\\nIterating through accounts to collect CloudFront data...\")\n for account, profile_name in setup_org_accounts_session():\n account_id = account[\"Id\"]\n account_name = account[\"Name\"]\n print(f\"\\n--- Checking Account: {account_name} (ID: {account_id}) ---\")\n\n try:\n boto3.setup_default_session(profile_name=profile_name)\n cloudfront_client = boto3.client(\"cloudfront\")\n\n dist_data = get_cloudfront_details(\n account_id, account_name, cloudfront_client\n )\n for dist in dist_data:\n all_dist_info.append(dist)\n writer.writerow(dist)\n rows_for_sheets.append(list(dist.values()))\n print(\n f\" [FOUND] ID: {dist['ID']}, Domain: {dist['DomainName']}, Status: {dist['Status']}\"\n )\n except (ClientError, ProfileNotFound) as e:\n print(f\" -> Error setting up session/profile '{profile_name}': {e}\")\n\n print(f\"\\nTotal distributions found: {len(all_dist_info)}\")\n print(f\"CSV file '{csv_filepath}' generated successfully.\")\n export_to_sheets(\"cloudfront-distributions\", fieldnames, rows_for_sheets)\n\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\nimport os\n\n# Audits mapping: command -> (module_name, file_path)\nAUDITS = {\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"aws_bills\", \"aws_bills.py\"),\n \"bw\": (\"bwsuage\", \"bwsuage.py\"),\n \"storage\": (\"storage_usage\", \"storage-usage.py\"),\n \"rds\": (\"list_rds_instances\", \"list_rds_instances.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\n\n# Audits mapping: command -> (module_name, file_path)\nAUDITS = {\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"audits.bills\", \"audits/bills.py\"),\n \"bw\": (\"bwsuage\", \"bwsuage.py\"),\n \"storage\": (\"storage_usage\", \"storage-usage.py\"),\n \"rds\": (\"list_rds_instances\", \"list_rds_instances.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\n# Audits mapping: command -> (module_name, file_path)\nAUDITS = {\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"audits.bills\", \"audits/bills.py\"),\n \"bw\": (\"audits.bw\", \"audits/bw.py\"),\n \"storage\": (\"storage_usage\", \"storage-usage.py\"),\n \"rds\": (\"list_rds_instances\", \"list_rds_instances.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n \"cloudfront\": (\"cloudfront\", \"cloudfront.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\nAUDITS = {\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"audits.bills\", \"audits/bills.py\"),\n \"bw\": (\"audits.bw\", \"audits/bw.py\"),\n \"storage\": (\"audits.storage\", \"audits/storage.py\"),\n \"rds\": (\"list_rds_instances\", \"list_rds_instances.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n \"cloudfront\": (\"cloudfront\", \"cloudfront.py\"),\n \"s3\": (\"s3_security_audit\", \"s3-security-audit.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"audits.bills\", \"audits/bills.py\"),\n \"bw\": (\"audits.bw\", \"audits/bw.py\"),\n \"storage\": (\"audits.storage\", \"audits/storage.py\"),\n \"rds\": (\"audits.rds\", \"audits/rds.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n \"cloudfront\": (\"cloudfront\", \"cloudfront.py\"),\n \"s3\": (\"s3_security_audit\", \"s3-security-audit.py\"),\n \"sg\": (\"security_group_audit\", \"security-group-audit.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully modified file: /Users/bob/git/dxw/aws-stuff/audit.py (1 replacements). Here is the updated code:\n...\n \"cost\": (\"audits.cost\", \"audits/cost.py\"),\n \"bills\": (\"audits.bills\", \"audits/bills.py\"),\n \"bw\": (\"audits.bw\", \"audits/bw.py\"),\n \"storage\": (\"audits.storage\", \"audits/storage.py\"),\n \"rds\": (\"audits.rds\", \"audits/rds.py\"),\n \"asg\": (\"audits.asg\", \"audits/asg.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n \"cloudfront\": (\"cloudfront\", \"cloudfront.py\"),\n \"s3\": (\"s3_security_audit\", \"s3-security-audit.py\"),\n \"sg\": (\"security_group_audit\", \"security-group-audit.py\"),\n \"iam\": (\"iam_audit\", \"iam-audit.py\"),\n..."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: (empty)\nProcess Group PGID: 75742"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: (empty)\nProcess Group PGID: 76074"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: (empty)\nProcess Group PGID: 76146"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: Running tests...\n./scripts/test: line 10: pytest: command not found\nExit Code: 127\nProcess Group PGID: 76348"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: Installing dependencies...\n\n[notice] A new release of pip is available: 24.2 -> 26.0.1\n[notice] To update, run: pip install --upgrade pip\n\n[notice] A new release of pip is available: 24.2 -> 26.0.1\n[notice] To update, run: pip install --upgrade pip\nSetup complete.\nRunning tests...\n====================================================== test session starts =======================================================\nplatform darwin -- Python 3.12.13, pytest-9.0.2, pluggy-1.6.0\nrootdir: /Users/bob/git/dxw/aws-stuff\nplugins: anyio-4.12.1\ncollected 18 items \n\ntests/test_aws_utils.py ............ [ 66%]\ntests/test_scripts.py .... [ 88%]\ntests/test_vpc.py .. [100%]\n\n======================================================= 18 passed in 0.14s =======================================================\nProcess Group PGID: 76391"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: (empty)\nProcess Group PGID: 76621"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: (empty)\nProcess Group PGID: 80538"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"error": "[Operation Cancelled] Reason: User denied execution."
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: Running tests...\n====================================================== test session starts =======================================================\nplatform darwin -- Python 3.12.13, pytest-9.0.2, pluggy-1.6.0\nrootdir: /Users/bob/git/dxw/aws-stuff\nplugins: anyio-4.12.1\ncollected 18 items \n\ntests/test_aws_utils.py ............ [ 66%]\ntests/test_bills.py . [ 72%]\ntests/test_bw.py . [ 77%]\ntests/test_cost.py . [ 83%]\ntests/test_storage.py . [ 88%]\ntests/test_vpc.py .. [100%]\n\n======================================================= 18 passed in 0.14s =======================================================\nProcess Group PGID: 81294"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Output: Running tests...\n====================================================== test session starts =======================================================\nplatform darwin -- Python 3.12.13, pytest-9.0.2, pluggy-1.6.0\nrootdir: /Users/bob/git/dxw/aws-stuff\nplugins: anyio-4.12.1\ncollected 20 items \n\ntests/test_asg.py . [ 5%]\ntests/test_aws_utils.py ............ [ 65%]\ntests/test_bills.py . [ 70%]\ntests/test_bw.py . [ 75%]\ntests/test_cost.py . [ 80%]\ntests/test_rds.py . [ 85%]\ntests/test_storage.py . [ 90%]\ntests/test_vpc.py .. [100%]\n\n======================================================= 20 passed in 0.15s =======================================================\nProcess Group PGID: 81383"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully overwrote file: /Users/bob/git/dxw/aws-stuff/scripts/setup. Here is the updated code:\n#!/bin/bash\nset -e\n\n# Create virtual environment if it doesn't exist\nif [ ! -d \".venv\" ]; then\n echo \"Creating virtual environment...\"\n python3 -m venv .venv\nfi\n\n# Activate virtualenv and install dependencies\nsource .venv/bin/activate\necho \"Installing dependencies...\"\npip install --quiet -r requirements.txt\npip install --quiet pytest ruff\n\n# Setup .env if it doesn't exist\nif [ ! -f \".env\" ]; then\n echo \"Copying .env.example to .env...\"\n cp .env.example .env\nfi\n\necho \"Setup complete.\"\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully overwrote file: /Users/bob/git/dxw/aws-stuff/scripts/test. Here is the updated code:\n#!/bin/bash\nset -e\n\n# Use current virtualenv if available\nif [ -d \".venv\" ]; then\n source .venv/bin/activate\nfi\n\necho \"Running tests...\"\npytest tests/\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully overwrote file: /Users/bob/git/dxw/aws-stuff/scripts/lint. Here is the updated code:\n#!/bin/bash\nset -e\n\n# Use current virtualenv if available\nif [ -d \".venv\" ]; then\n source .venv/bin/activate\nfi\n\necho \"Running linting (ruff)...\"\nruff check .\necho \"Running formatting check (ruff)...\"\nruff format --check .\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/vpc.py. Here is the updated code:\nimport boto3\nimport csv\nfrom botocore.exceptions import ClientError, ProfileNotFound\nfrom aws_utils import setup_org_accounts_session, get_boto_session, export_to_sheets\n\ndef get_vpc_name(tags):\n if tags:\n for tag in tags:\n if tag[\"Key\"] == \"Name\":\n return tag[\"Value\"]\n return \"Unnamed VPC\"\n\ndef fetch_vpcs(ec2_client):\n \"\"\"\n Fetches VPC information from a single region.\n Returns a list of dicts.\n \"\"\"\n vpcs_info = []\n try:\n paginator = ec2_client.get_paginator(\"describe_vpcs\")\n for page in paginator.paginate():\n for vpc in page[\"Vpcs\"]:\n vpcs_info.append(\n {\n \"VPCName\": get_vpc_name(vpc.get(\"Tags\")),\n \"VPCID\": vpc[\"VpcId\"],\n \"CIDRBlock\": vpc.get(\"CidrBlock\", \"N/A\"),\n }\n )\n except ClientError as e:\n print(f\"Error fetching VPCs: {e}\")\n return vpcs_info\n\ndef get_all_org_vpcs(regions=None):\n \"\"\"\n Iterates through all accounts and regions to gather VPC info.\n Returns a list of rows (dicts) for the report.\n \"\"\"\n if regions is None:\n regions = [\"eu-west-1\", \"eu-west-2\"]\n \n all_rows = []\n for account, profile_name in setup_org_accounts_session():\n account_id = account[\"Id\"]\n account_name = account[\"Name\"]\n try:\n # We use a session per account\n account_session = boto3.Session(profile_name=profile_name)\n for region in regions:\n try:\n ec2_client = account_session.client(\"ec2\", region_name=region)\n vpcs_info = fetch_vpcs(ec2_client)\n \n for vpc in vpcs_info:\n row = {\n \"AccountName\": account_name,\n \"AccountID\": account_id,\n \"Region\": region,\n \"VPCName\": vpc[\"VPCName\"],\n \"VPCID\": vpc[\"VPCID\"],\n \"CIDRBlock\": vpc[\"CIDRBlock\"],\n }\n all_rows.append(row)\n \n if vpcs_info:\n print(f'\"{account_name}\" ({account_id}, {region}) VPCs found: {len(vpcs_info)}')\n except ClientError as e:\n print(f\"Error in {region} for {account_name}: {e}\")\n except (ClientError, ProfileNotFound) as e:\n print(f\"Error accessing account {account_name}: {e}\")\n \n return all_rows\n\ndef run():\n \"\"\"Main entry point for the VPC audit.\"\"\"\n get_boto_session() # Ensure SSO login\n \n rows = get_all_org_vpcs()\n fieldnames = [\"AccountName\", \"AccountID\", \"Region\", \"VPCName\", \"VPCID\", \"CIDRBlock\"]\n \n # Write to CSV\n output_file = \"aws_vpcs_by_region.csv\"\n with open(output_file, mode=\"w\", newline=\"\") as csvfile:\n writer = csv.DictWriter(csvfile, fieldnames=fieldnames)\n writer.writeheader()\n for row in rows:\n writer.writerow(row)\n \n print(f\"\\nTotal number of VPCs found: {len(rows)}\")\n print(f\"CSV file '{output_file}' generated successfully.\")\n \n # Export to Google Sheets\n rows_for_sheets = [list(row.values()) for row in rows]\n export_to_sheets(\"aws-vpc\", fieldnames, rows_for_sheets)\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_vpc.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock, patch\nfrom audits.vpc import fetch_vpcs, get_vpc_name\n\nclass TestVpcAudit(unittest.TestCase):\n def test_get_vpc_name(self):\n tags = [{\"Key\": \"Name\", \"Value\": \"MyVPC\"}]\n self.assertEqual(get_vpc_name(tags), \"MyVPC\")\n \n tags_no_name = [{\"Key\": \"Environment\", \"Value\": \"Prod\"}]\n self.assertEqual(get_vpc_name(tags_no_name), \"Unnamed VPC\")\n \n self.assertEqual(get_vpc_name([]), \"Unnamed VPC\")\n self.assertEqual(get_vpc_name(None), \"Unnamed VPC\")\n\n def test_fetch_vpcs(self):\n # Mock EC2 client and paginator\n mock_ec2 = MagicMock()\n mock_paginator = MagicMock()\n mock_ec2.get_paginator.return_value = mock_paginator\n \n # Mock the response from describe_vpcs\n mock_paginator.paginate.return_value = [\n {\n \"Vpcs\": [\n {\n \"VpcId\": \"vpc-123\",\n \"CidrBlock\": \"10.0.0.0/16\",\n \"Tags\": [{\"Key\": \"Name\", \"Value\": \"TestVPC\"}]\n },\n {\n \"VpcId\": \"vpc-456\",\n \"CidrBlock\": \"192.168.0.0/24\"\n # No Name tag\n }\n ]\n }\n ]\n \n vpcs = fetch_vpcs(mock_ec2)\n \n self.assertEqual(len(vpcs), 2)\n self.assertEqual(vpcs[0][\"VPCName\"], \"TestVPC\")\n self.assertEqual(vpcs[0][\"VPCID\"], \"vpc-123\")\n self.assertEqual(vpcs[0][\"CIDRBlock\"], \"10.0.0.0/16\")\n \n self.assertEqual(vpcs[1][\"VPCName\"], \"Unnamed VPC\")\n self.assertEqual(vpcs[1][\"VPCID\"], \"vpc-456\")\n self.assertEqual(vpcs[1][\"CIDRBlock\"], \"192.168.0.0/24\")\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audit.py. Here is the updated code:\nimport argparse\nimport sys\nimport importlib.util\nimport os\n\n# Audits mapping: command -> (module_name, file_path)\nAUDITS = {\n \"vpc\": (\"audits.vpc\", \"audits/vpc.py\"),\n \"cost\": (\"aws_cost\", \"aws_cost.py\"),\n \"bills\": (\"aws_bills\", \"aws_bills.py\"),\n \"bw\": (\"bwsuage\", \"bwsuage.py\"),\n \"storage\": (\"storage_usage\", \"storage-usage.py\"),\n \"rds\": (\"list_rds_instances\", \"list_rds_instances.py\"),\n \"asg\": (\"aws_asg_lt_ami\", \"aws-asg-lt-ami.py\"),\n \"ips\": (\"aws_ips\", \"aws-ips.py\"),\n \"cloudfront\": (\"cloudfront\", \"cloudfront.py\"),\n \"s3\": (\"s3_security_audit\", \"s3-security-audit.py\"),\n \"sg\": (\"security_group_audit\", \"security-group-audit.py\"),\n \"iam\": (\"iam_audit\", \"iam-audit.py\"),\n \"carbon\": (\"aws_carbon\", \"aws-carbon.py\"),\n \"metrics\": (\"aws_metrics\", \"aws-metrics.py\"),\n}\n\ndef run_audit(name):\n if name not in AUDITS:\n print(f\"Error: Unknown audit '{name}'\")\n print_usage()\n sys.exit(1)\n \n module_name, file_path = AUDITS[name]\n \n if not os.path.exists(file_path):\n print(f\"Error: File not found for audit '{name}': {file_path}\")\n sys.exit(1)\n \n print(f\"Running audit: {name}...\")\n \n # Import and run the module\n spec = importlib.util.spec_from_file_location(module_name, file_path)\n module = importlib.util.module_from_spec(spec)\n sys.modules[module_name] = module\n spec.loader.exec_module(module)\n \n # Check if there's a 'run' or 'main' function\n if hasattr(module, \"run\"):\n module.run()\n elif hasattr(module, \"main\"):\n module.main()\n else:\n print(f\"Error: Audit '{name}' (in {file_path}) does not have a run() or main() function.\")\n sys.exit(1)\n\ndef print_usage():\n print(\"\\nAvailable audits:\")\n for name in sorted(AUDITS.keys()):\n print(f\" - {name}\")\n\ndef main():\n parser = argparse.ArgumentParser(description=\"AWS Organization Audit CLI\")\n parser.add_argument(\"audit\", nargs=\"?\", help=\"Name of the audit to run\")\n parser.add_argument(\"--list\", action=\"store_true\", help=\"List available audits\")\n \n args = parser.parse_args()\n \n if args.list or not args.audit:\n print_usage()\n return\n \n run_audit(args.audit)\n\nif __name__ == \"__main__\":\n main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/scripts/audit. Here is the updated code:\n#!/bin/bash\nset -e\n\n# Use current virtualenv if available\nif [ -d \".venv\" ]; then\n source .venv/bin/activate\nfi\n\npython3 audit.py \"$@\"\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/cost.py. Here is the updated code:\nimport boto3\nfrom botocore.exceptions import ClientError\nfrom aws_utils import get_account_names, get_previous_month_range, get_boto_session\n\ndef fetch_costs_with_savings(ce_client, start_date, end_date, account_map):\n \"\"\"\n Fetches EC2 costs grouped by account and service.\n Returns a list of dicts with cost details.\n \"\"\"\n results = []\n try:\n response = ce_client.get_cost_and_usage(\n TimePeriod={\"Start\": start_date, \"End\": end_date},\n Granularity=\"MONTHLY\",\n Metrics=[\n \"UnblendedCost\",\n \"AmortizedCost\",\n \"NetAmortizedCost\",\n \"NetUnblendedCost\",\n ],\n GroupBy=[\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"SERVICE\"},\n ],\n )\n\n if not response.get(\"ResultsByTime\"):\n return results\n\n for result in response[\"ResultsByTime\"]:\n period_start = result[\"TimePeriod\"][\"Start\"]\n\n for group in result[\"Groups\"]:\n account_id = group[\"Keys\"][0]\n service = group[\"Keys\"][1]\n\n if \"Amazon Elastic Compute Cloud\" in service:\n metrics = group[\"Metrics\"]\n unblended = float(metrics[\"UnblendedCost\"][\"Amount\"])\n amortized = float(metrics[\"AmortizedCost\"][\"Amount\"])\n net_amortized = float(metrics[\"NetAmortizedCost\"][\"Amount\"])\n net_unblended = float(metrics[\"NetUnblendedCost\"][\"Amount\"])\n unit = metrics[\"UnblendedCost\"][\"Unit\"]\n\n account_name = account_map.get(account_id, account_id)\n\n results.append({\n \"PeriodStart\": period_start,\n \"AccountName\": account_name,\n \"AccountID\": account_id,\n \"Service\": service,\n \"UnblendedCost\": unblended,\n \"AmortizedCost\": amortized,\n \"NetAmortizedCost\": net_amortized,\n \"NetUnblendedCost\": net_unblended,\n \"Unit\": unit\n })\n\n except ClientError as e:\n print(f\"Error fetching costs: {e}\")\n \n return results\n\ndef run():\n session = get_boto_session()\n ce_client = session.client(\"ce\", region_name=\"us-east-1\")\n account_map = get_account_names()\n start_date, end_date = get_previous_month_range()\n\n print(f\"Fetching AWS EC2 costs with compute savings from {start_date} to {end_date}...\")\n \n cost_data = fetch_costs_with_savings(ce_client, start_date, end_date, account_map)\n \n if not cost_data:\n print(\"No cost data available for the specified period.\")\n return\n\n for item in cost_data:\n print(f\"Billing Period: {item['PeriodStart']}\")\n print(f\"Account: {item['AccountName']} | Service: {item['Service']}\")\n print(f\" - Total Cost (Unblended): {item['UnblendedCost']} {item['Unit']}\")\n print(f\" - Net Cost (Amortized): {item['NetAmortizedCost']} {item['Unit']}\")\n print(f\" - Net Cost (Unblended): {item['NetUnblendedCost']} {item['Unit']}\")\n print(f\" - Final Cost (Amortized): {item['AmortizedCost']} {item['Unit']}\")\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/bills.py. Here is the updated code:\nimport boto3\nfrom botocore.exceptions import ClientError\nfrom aws_utils import get_account_names, get_last_n_months_ranges, get_boto_session\n\ndef fetch_bills(ce_client, ranges):\n \"\"\"\n Fetches unblended and amortized costs across given ranges.\n Returns a dict: { account_id: { \"unblended\": [m1, m2, m3], \"amortized\": [m1, m2, m3] } }\n \"\"\"\n account_costs = {}\n \n for start_date, end_date, _ in ranges:\n try:\n response = ce_client.get_cost_and_usage(\n TimePeriod={\"Start\": start_date, \"End\": end_date},\n Granularity=\"MONTHLY\",\n Metrics=[\"UnblendedCost\", \"AmortizedCost\"],\n GroupBy=[{\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"}],\n )\n\n for result in response[\"ResultsByTime\"]:\n for group in result[\"Groups\"]:\n account_id = group[\"Keys\"][0]\n unblended = float(group[\"Metrics\"][\"UnblendedCost\"][\"Amount\"])\n amortized = float(group[\"Metrics\"][\"AmortizedCost\"][\"Amount\"])\n\n if account_id not in account_costs:\n account_costs[account_id] = {\"unblended\": [], \"amortized\": []}\n\n account_costs[account_id][\"unblended\"].append(unblended)\n account_costs[account_id][\"amortized\"].append(amortized)\n except ClientError as e:\n print(f\"Error fetching cost for range {start_date} to {end_date}: {e}\")\n \n return account_costs\n\ndef run():\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n account_names = get_account_names()\n\n ranges = get_last_n_months_ranges(3)\n months_labels = [r[2] for r in ranges]\n\n account_costs = fetch_bills(ce_client, ranges)\n\n # Sort accounts by the most recent month's amortized cost\n sorted_accounts = sorted(\n account_costs.items(),\n key=lambda item: item[1][\"amortized\"][0] if item[1][\"amortized\"] else 0,\n reverse=True,\n )\n\n print(f\"\\nAWS Costs for the last 3 months (Amortized and Unblended, sorted by highest Amortized Cost in {months_labels[0]}):\")\n header = f\"{'Account Name':<30} {' ' + months_labels[0] + ' (Amort)':<15} {' ' + months_labels[1] + ' (Amort)':<15} {' ' + months_labels[2] + ' (Amort)':<15} {' ' + months_labels[0] + ' (Unbl)':<15} {' ' + months_labels[1] + ' (Unbl)':<15} {' ' + months_labels[2] + ' (Unbl)':<15}\"\n print(header)\n print(\"-\" * 125)\n\n for account_id, costs in sorted_accounts:\n name = account_names.get(account_id, f\"Unknown ({account_id})\")\n\n # Ensure we have 3 months of data\n amortized = costs[\"amortized\"] + [0.0] * (3 - len(costs[\"amortized\"]))\n unblended = costs[\"unblended\"] + [0.0] * (3 - len(costs[\"unblended\"]))\n\n amortized_fmt = [f\"${c:,.2f}\" for c in amortized[:3]]\n unblended_fmt = [f\"${c:,.2f}\" for c in unblended[:3]]\n\n print(\n f\"{name[:30]:<30} {amortized_fmt[0]:>15} {amortized_fmt[1]:>15} {amortized_fmt[2]:>15} {unblended_fmt[0]:>15} {unblended_fmt[1]:>15} {unblended_fmt[2]:>15}\"\n )\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_cost.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock\nfrom audits.cost import fetch_costs_with_savings\n\nclass TestCostAudit(unittest.TestCase):\n def test_fetch_costs_with_savings(self):\n mock_ce = MagicMock()\n \n # Mock response\n mock_ce.get_cost_and_usage.return_value = {\n \"ResultsByTime\": [\n {\n \"TimePeriod\": {\"Start\": \"2023-10-01\", \"End\": \"2023-11-01\"},\n \"Groups\": [\n {\n \"Keys\": [\"123456789012\", \"Amazon Elastic Compute Cloud - Compute\"],\n \"Metrics\": {\n \"UnblendedCost\": {\"Amount\": \"100.0\", \"Unit\": \"USD\"},\n \"AmortizedCost\": {\"Amount\": \"90.0\", \"Unit\": \"USD\"},\n \"NetAmortizedCost\": {\"Amount\": \"85.0\", \"Unit\": \"USD\"},\n \"NetUnblendedCost\": {\"Amount\": \"95.0\", \"Unit\": \"USD\"},\n }\n }\n ]\n }\n ]\n }\n \n account_map = {\"123456789012\": \"Test Account\"}\n \n results = fetch_costs_with_savings(mock_ce, \"2023-10-01\", \"2023-11-01\", account_map)\n \n self.assertEqual(len(results), 1)\n self.assertEqual(results[0][\"AccountName\"], \"Test Account\")\n self.assertEqual(results[0][\"Service\"], \"Amazon Elastic Compute Cloud - Compute\")\n self.assertEqual(results[0][\"UnblendedCost\"], 100.0)\n self.assertEqual(results[0][\"AmortizedCost\"], 90.0)\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_bills.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock\nfrom audits.bills import fetch_bills\n\nclass TestBillsAudit(unittest.TestCase):\n def test_fetch_bills(self):\n mock_ce = MagicMock()\n \n # Mock response for one range\n mock_ce.get_cost_and_usage.return_value = {\n \"ResultsByTime\": [\n {\n \"Groups\": [\n {\n \"Keys\": [\"123456789012\"],\n \"Metrics\": {\n \"UnblendedCost\": {\"Amount\": \"100.0\", \"Unit\": \"USD\"},\n \"AmortizedCost\": {\"Amount\": \"90.0\", \"Unit\": \"USD\"},\n }\n }\n ]\n }\n ]\n }\n \n ranges = [(\"2023-10-01\", \"2023-11-01\", \"2023-10\")]\n \n results = fetch_bills(mock_ce, ranges)\n \n self.assertIn(\"123456789012\", results)\n self.assertEqual(results[\"123456789012\"][\"unblended\"], [100.0])\n self.assertEqual(results[\"123456789012\"][\"amortized\"], [90.0])\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/bw.py. Here is the updated code:\nimport boto3\nfrom botocore.exceptions import ClientError\nfrom aws_utils import (\n get_account_names,\n get_previous_month_range,\n get_boto_session,\n export_to_sheets,\n)\n\ndef fetch_bandwidth(ce_client, start_date, end_date, all_accounts):\n \"\"\"\n Fetches bandwidth usage grouped by account.\n Returns a dict: { account_id: total_usage_in_gb }\n \"\"\"\n account_totals = {acc_id: 0.0 for acc_id in all_accounts.keys()}\n next_token = None\n\n try:\n while True:\n params = {\n \"TimePeriod\": {\"Start\": start_date, \"End\": end_date},\n \"Granularity\": \"MONTHLY\",\n \"Metrics\": [\"UsageQuantity\"],\n \"GroupBy\": [\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"USAGE_TYPE\"},\n ],\n }\n if next_token:\n params[\"NextPageToken\"] = next_token\n\n response = ce_client.get_cost_and_usage(**params)\n\n for result in response.get(\"ResultsByTime\", []):\n for group in result.get(\"Groups\", []):\n acc_id = group[\"Keys\"][0]\n usage_type = group[\"Keys\"][1]\n usage_amount = float(group[\"Metrics\"][\"UsageQuantity\"][\"Amount\"])\n unit = group[\"Metrics\"][\"UsageQuantity\"][\"Unit\"]\n\n if \"DataTransfer\" in usage_type:\n if unit == \"Bytes\":\n usage_amount /= 1024**3\n elif unit in [\"MB\", \"Megabytes\"]:\n usage_amount /= 1024\n\n if acc_id in account_totals:\n account_totals[acc_id] += usage_amount\n else:\n account_totals[acc_id] = usage_amount\n\n next_token = response.get(\"NextPageToken\")\n if not next_token:\n break\n except ClientError as e:\n print(f\"Error fetching bandwidth: {e}\")\n\n return account_totals\n\ndef run():\n threshold_gb = 100.0\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n all_accounts = get_account_names()\n \n if not all_accounts:\n print(\"No accounts found.\")\n return\n\n start_date, end_date = get_previous_month_range()\n \n account_totals = fetch_bandwidth(ce_client, start_date, end_date, all_accounts)\n\n print(f\"\\n--- Full Bandwidth Report ({start_date} to {end_date}) ---\")\n print(f\"{'Account Name':<25} | {'Total GB':<12} | {'Overage (>100GB)'}\")\n print(\"-\" * 75)\n\n sorted_accounts = sorted(\n account_totals.items(),\n key=lambda x: (x[1], all_accounts.get(x[0], x[0])),\n reverse=True,\n )\n\n headers = [\"Account Name\", \"Account ID\", \"Total GB\", \"Overage (>100GB)\"]\n rows = []\n\n for acc_id, total_usage in sorted_accounts:\n acc_name = all_accounts.get(acc_id, f\"Deleted/Unknown ({acc_id})\")\n overage = max(0, total_usage - threshold_gb)\n status_icon = \"⚠️\" if overage > 0 else \"✅\"\n print(\n f\"{acc_name[:25]:<25} | {total_usage:>10.2f} GB | {overage:>8.2f} GB {status_icon}\"\n )\n\n rows.append([acc_name, acc_id, round(total_usage, 2), round(overage, 2)])\n\n # Export to Google Sheets\n export_to_sheets(\"AWS_Bandwidth_Usage\", headers, rows)\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/storage.py. Here is the updated code:\nimport boto3\nimport calendar\nfrom datetime import datetime, timedelta\nfrom botocore.exceptions import ClientError\nfrom aws_utils import (\n get_account_names,\n get_previous_month_range,\n get_boto_session,\n export_to_sheets,\n)\n\ndef fetch_storage(ce_client, start_date, end_date, all_accounts, hours_in_month):\n \"\"\"\n Fetches storage usage grouped by account.\n Returns a dict: { account_id: total_gb }\n \"\"\"\n account_storage_totals = {acc_id: 0.0 for acc_id in all_accounts.keys()}\n next_token = None\n\n try:\n while True:\n params = {\n \"TimePeriod\": {\"Start\": start_date, \"End\": end_date},\n \"Granularity\": \"MONTHLY\",\n \"Metrics\": [\"UsageQuantity\"],\n \"GroupBy\": [\n {\"Type\": \"DIMENSION\", \"Key\": \"LINKED_ACCOUNT\"},\n {\"Type\": \"DIMENSION\", \"Key\": \"USAGE_TYPE\"},\n ],\n \"Filter\": {\n \"Dimensions\": {\n \"Key\": \"USAGE_TYPE_GROUP\",\n \"Values\": [\n \"S3: Storage - Standard\",\n \"S3: Storage - Infinite Archive\",\n \"EC2: EBS - Optimized Storage\",\n \"RDS: Storage\",\n \"EFS: Storage - Standard\",\n \"EFS: Storage - IA\",\n \"EFS: Storage - Archive\",\n ],\n }\n },\n }\n if next_token:\n params[\"NextPageToken\"] = next_token\n\n response = ce_client.get_cost_and_usage(**params)\n\n for result in response.get(\"ResultsByTime\", []):\n for group in result.get(\"Groups\", []):\n acc_id = group[\"Keys\"][0]\n usage_amount = float(group[\"Metrics\"][\"UsageQuantity\"][\"Amount\"])\n unit = group[\"Metrics\"][\"UsageQuantity\"][\"Unit\"]\n\n if unit == \"ByteHrs\":\n usage_amount = (usage_amount / (1024**3)) / hours_in_month\n elif unit in [\"MB-Mo\", \"Megabyte-Months\"]:\n usage_amount /= 1024\n\n if acc_id in account_storage_totals:\n account_storage_totals[acc_id] += usage_amount\n\n next_token = response.get(\"NextPageToken\")\n if not next_token:\n break\n except ClientError as e:\n print(f\"Error fetching storage: {e}\")\n \n return account_storage_totals\n\ndef run():\n threshold_gb = 100.0\n session = get_boto_session()\n ce_client = session.client(\"ce\")\n all_accounts = get_account_names()\n start_date, end_date = get_previous_month_range()\n\n # Calculate hours in the previous month for Byte-Hour conversion\n dt_end = datetime.strptime(end_date, \"%Y-%m-%d\")\n last_day_prev = dt_end - timedelta(days=1)\n _, num_days = calendar.monthrange(last_day_prev.year, last_day_prev.month)\n hours_in_month = num_days * 24\n\n account_storage_totals = fetch_storage(ce_client, start_date, end_date, all_accounts, hours_in_month)\n\n print(f\"\\n--- Storage Report ({start_date} to {end_date}) ---\")\n print(f\"{'Account Name':<25} | {'Storage (GB)':<12} | {'Overage (>100GB)'}\")\n print(\"-\" * 75)\n\n rows = []\n sorted_accounts = sorted(\n account_storage_totals.items(), key=lambda x: x[1], reverse=True\n )\n for acc_id, total_gb in sorted_accounts:\n acc_name = all_accounts.get(acc_id, acc_id)\n overage = max(0, total_gb - threshold_gb)\n status_icon = \"⚠️\" if overage > 0 else \"✅\"\n print(\n f\"{acc_name[:25]:<25} | {total_gb:>10.2f} GB | {overage:>8.2f} GB {status_icon}\"\n )\n rows.append([acc_name, acc_id, round(total_gb, 2), round(overage, 2)])\n\n headers = [\n \"Account Name\",\n \"Account ID\",\n f\"Storage GB ({start_date})\",\n \"Overage (>100GB)\",\n ]\n export_to_sheets(\"AWS_Storage_Usage\", headers, rows)\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_bw.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock\nfrom audits.bw import fetch_bandwidth\n\nclass TestBwAudit(unittest.TestCase):\n def test_fetch_bandwidth(self):\n mock_ce = MagicMock()\n \n # Mock response\n mock_ce.get_cost_and_usage.return_value = {\n \"ResultsByTime\": [\n {\n \"Groups\": [\n {\n \"Keys\": [\"123456789012\", \"DataTransfer-Out-Bytes\"],\n \"Metrics\": {\n \"UsageQuantity\": {\"Amount\": str(1024**3 * 50), \"Unit\": \"Bytes\"},\n }\n },\n {\n \"Keys\": [\"123456789012\", \"DataTransfer-Regional-Bytes\"],\n \"Metrics\": {\n \"UsageQuantity\": {\"Amount\": str(1024**3 * 10), \"Unit\": \"Bytes\"},\n }\n }\n ]\n }\n ]\n }\n \n all_accounts = {\"123456789012\": \"Test Account\"}\n \n results = fetch_bandwidth(mock_ce, \"2023-10-01\", \"2023-11-01\", all_accounts)\n \n self.assertIn(\"123456789012\", results)\n # Total usage should be 50 + 10 = 60 GB\n self.assertEqual(results[\"123456789012\"], 60.0)\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_storage.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock\nfrom audits.storage import fetch_storage\n\nclass TestStorageAudit(unittest.TestCase):\n def test_fetch_storage(self):\n mock_ce = MagicMock()\n \n # Mock response\n mock_ce.get_cost_and_usage.return_value = {\n \"ResultsByTime\": [\n {\n \"Groups\": [\n {\n \"Keys\": [\"123456789012\", \"S3: Storage - Standard\"],\n \"Metrics\": {\n \"UsageQuantity\": {\"Amount\": str(1024**3 * 100 * 720), \"Unit\": \"ByteHrs\"},\n }\n }\n ]\n }\n ]\n }\n \n all_accounts = {\"123456789012\": \"Test Account\"}\n hours_in_month = 720\n \n results = fetch_storage(mock_ce, \"2023-10-01\", \"2023-11-01\", all_accounts, hours_in_month)\n \n self.assertIn(\"123456789012\", results)\n # Usage should be 100 GB\n self.assertEqual(results[\"123456789012\"], 100.0)\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/audits/rds.py. Here is the updated code:\nimport boto3\nfrom botocore.exceptions import ClientError, ProfileNotFound\nfrom aws_utils import setup_org_accounts_session\n\ndef fetch_rds_resources(rds_client):\n \"\"\"\n Fetches RDS instances and clusters names.\n Returns a list of strings.\n \"\"\"\n resources = []\n try:\n # Instances\n paginator = rds_client.get_paginator(\"describe_db_instances\")\n for page in paginator.paginate():\n for db_instance in page[\"DBInstances\"]:\n resources.append(db_instance[\"DBInstanceIdentifier\"])\n \n # Clusters\n paginator = rds_client.get_paginator(\"describe_db_clusters\")\n for page in paginator.paginate():\n for db_cluster in page[\"DBClusters\"]:\n resources.append(db_cluster[\"DBClusterIdentifier\"])\n except ClientError as e:\n print(f\"Error fetching RDS resources: {e}\")\n \n return resources\n\ndef run():\n \"\"\"Main entry point for RDS audit.\"\"\"\n for account, profile_name in setup_org_accounts_session():\n account_id = account[\"Id\"]\n account_name = account[\"Name\"]\n try:\n account_session = boto3.Session(profile_name=profile_name)\n rds_client = account_session.client(\"rds\")\n\n names = fetch_rds_resources(rds_client)\n for name in names:\n print(f'\"{account_name}\":\"{name}\"')\n\n if names:\n print(f\"Processed account '{account_name}' (ID: {account_id})\")\n except (ClientError, ProfileNotFound) as e:\n print(f\"Error accessing account '{account_name}': {e}\")\n\n print(\"\\nRDS names have been successfully listed for all accounts.\")\n\nif __name__ == \"__main__\":\n run()\n"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_rds.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock\nfrom audits.rds import fetch_rds_resources\n\nclass TestRdsAudit(unittest.TestCase):\n def test_fetch_rds_resources(self):\n mock_rds = MagicMock()\n \n # Mock instance response\n mock_rds.get_paginator.return_value.paginate.side_effect = [\n [{\"DBInstances\": [{\"DBInstanceIdentifier\": \"db-1\"}]}], # first call (instances)\n [{\"DBClusters\": [{\"DBClusterIdentifier\": \"cluster-1\"}]}] # second call (clusters)\n ]\n \n resources = fetch_rds_resources(mock_rds)\n \n self.assertEqual(len(resources), 2)\n self.assertIn(\"db-1\", resources)\n self.assertIn(\"cluster-1\", resources)\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"output": "Successfully created and wrote to new file: /Users/bob/git/dxw/aws-stuff/tests/test_asg.py. Here is the updated code:\nimport unittest\nfrom unittest.mock import MagicMock, patch\nfrom audits.asg import fetch_asg_details\n\nclass TestAsgAudit(unittest.TestCase):\n @patch(\"audits.asg.get_ami_name\")\n def test_fetch_asg_details_launch_config(self, mock_get_ami_name):\n mock_asg_client = MagicMock()\n mock_ec2_client = MagicMock()\n mock_get_ami_name.return_value = \"TestAMI\"\n \n # Mock ASG with Launch Config\n mock_asg_client.get_paginator.return_value.paginate.return_value = [\n {\n \"AutoScalingGroups\": [\n {\n \"AutoScalingGroupName\": \"asg-lc\",\n \"LaunchConfigurationName\": \"lc-1\"\n }\n ]\n }\n ]\n \n # Mock LC response\n mock_asg_client.describe_launch_configurations.return_value = {\n \"LaunchConfigurations\": [{\"ImageId\": \"ami-123\"}]\n }\n \n results = fetch_asg_details(mock_asg_client, mock_ec2_client, \"123\", \"Acc\", \"region\")\n \n self.assertEqual(len(results), 1)\n self.assertEqual(results[0][\"ASG_Name\"], \"asg-lc\")\n self.assertEqual(results[0][\"Template_Type\"], \"LaunchConfiguration\")\n self.assertEqual(results[0][\"AMI_ID\"], \"ami-123\")\n\nif __name__ == \"__main__\":\n unittest.main()\n"
|
||||
}
|
||||
Reference in New Issue
Block a user