Added pytest + some tests.

Added AWS S3 Support (optional, for cloud image storage)
This commit is contained in:
George Khananaev
2025-06-03 00:12:11 +07:00
parent 84399dfbe8
commit 50aaa9ce26
9 changed files with 755 additions and 5 deletions

146
README.md
View File

@@ -18,6 +18,7 @@
- Snags EVERY damn photo from reviews and profiles - Snags EVERY damn photo from reviews and profiles
- Hoards local paths or swaps URLs to your domain like a boss - Hoards local paths or swaps URLs to your domain like a boss
- Multi-threaded downloading that would make NASA jealous - Multi-threaded downloading that would make NASA jealous
- **S3 Cloud Storage**: Auto-upload images to AWS S3 with custom folder structure
- **Time-Bending Magic**: Transforms Google's vague "2 weeks ago" garbage into precise ISO timestamps - **Time-Bending Magic**: Transforms Google's vague "2 weeks ago" garbage into precise ISO timestamps
- **Sort Any Damn Way**: Newest, highest, lowest, relevance - we've got you covered - **Sort Any Damn Way**: Newest, highest, lowest, relevance - we've got you covered
- **Metadata on Steroids**: Inject custom parameters into every review record - **Metadata on Steroids**: Inject custom parameters into every review record
@@ -32,6 +33,7 @@
Python 3.10+ (don't even try with 3.9, seriously) Python 3.10+ (don't even try with 3.9, seriously)
Chrome browser (the fresher the better) Chrome browser (the fresher the better)
MongoDB (optional, but c'mon, live a little) MongoDB (optional, but c'mon, live a little)
AWS S3 Account (optional, for cloud image storage)
Coffee (mandatory for watching thousands of reviews roll in) Coffee (mandatory for watching thousands of reviews roll in)
``` ```
@@ -94,6 +96,19 @@ image_dir: "review_images" # Directory to store downloaded images
download_threads: 4 # Number of threads for downloading images download_threads: 4 # Number of threads for downloading images
store_local_paths: true # Whether to store local image paths in documents store_local_paths: true # Whether to store local image paths in documents
# S3 settings (optional)
use_s3: false # Whether to upload images to S3
s3:
aws_access_key_id: "" # AWS Access Key ID
aws_secret_access_key: "" # AWS Secret Access Key
region_name: "us-east-1" # AWS region
bucket_name: "" # S3 bucket name
prefix: "reviews/" # Base prefix for uploaded files
profiles_folder: "profiles/" # Folder name for profile images within prefix
reviews_folder: "reviews/" # Folder name for review images within prefix
delete_local_after_upload: false # Delete local files after successful S3 upload
s3_base_url: "" # Custom S3 base URL for accessing files (if empty, uses AWS default)
# URL replacement settings # URL replacement settings
replace_urls: true # Whether to replace original URLs with custom ones replace_urls: true # Whether to replace original URLs with custom ones
custom_url_base: "https://yourdomain.com/images" # Base URL for replacement custom_url_base: "https://yourdomain.com/images" # Base URL for replacement
@@ -150,6 +165,12 @@ python start.py --url "https://maps.app.goo.gl/YOUR_URL" --download-images true
# Every. Single. Picture. With your domain stamped all over 'em. # Every. Single. Picture. With your domain stamped all over 'em.
``` ```
6. S3 Cloud Storage Beast Mode:
```bash
python start.py --url "https://maps.app.goo.gl/YOUR_URL" --download-images true --use-s3 true
# Downloads locally AND uploads to S3. Best of both worlds, baby!
```
### Command Line Arguments ### Command Line Arguments
``` ```
@@ -243,6 +264,7 @@ When running with default settings, the scraper creates:
3. `review_images/` - Directory containing downloaded images: 3. `review_images/` - Directory containing downloaded images:
- `review_images/profiles/` - Profile pictures - `review_images/profiles/` - Profile pictures
- `review_images/reviews/` - Review images - `review_images/reviews/` - Review images
4. **S3 Bucket** (when enabled) - Images uploaded to your configured S3 bucket with custom folder structure
## 🔄 Integration Examples ## 🔄 Integration Examples
@@ -304,6 +326,12 @@ print(f"Reviews with images: {len(reviews_with_images)}")
- Run with `sudo` if you're getting permission errors (not ideal but gets the job done) - Run with `sudo` if you're getting permission errors (not ideal but gets the job done)
- Some images vanish from Google's CDN faster than your ex. Nothing we can do about that. - Some images vanish from Google's CDN faster than your ex. Nothing we can do about that.
5. **S3 Upload Chaos**
- Double-check your AWS credentials and bucket permissions
- Make sure your bucket exists and is in the specified region
- Check if your bucket policy allows public-read for uploaded objects
- AWS charges for every API call, so don't go crazy with test uploads
### Operation Logs (AKA "What The Hell Is It Doing?") ### Operation Logs (AKA "What The Hell Is It Doing?")
We don't just log, we OBSESSIVELY document the scraper's every breath: We don't just log, we OBSESSIVELY document the scraper's every breath:
@@ -353,11 +381,129 @@ A: Damn straight. We've pulled 50k+ reviews without breaking a sweat. The MongoD
**Q: I found a bug/have a killer feature idea!** **Q: I found a bug/have a killer feature idea!**
A: Jump on GitHub and file an issue or PR. But do your homework first if you're reporting something already in the README, we'll roast you publicly. A: Jump on GitHub and file an issue or PR. But do your homework first if you're reporting something already in the README, we'll roast you publicly.
## ☁️ AWS S3 Setup Guide
Want to store your images in the cloud like a boss? Here's how to set up S3 integration:
### 1. Create an S3 Bucket
1. Log into [AWS Console](https://console.aws.amazon.com/s3/)
2. Click "Create bucket"
3. Choose a unique bucket name (e.g., `your-company-reviews`)
4. Select your preferred region
5. **Important**: Under "Block public access settings" - UNCHECK "Block all public access" if you want images to be publicly accessible
6. Create the bucket
### 2. Set Bucket Permissions
For public image access, add this bucket policy (replace `your-bucket-name`):
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicReadGetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::your-bucket-name/*"
}
]
}
```
### 3. Create IAM User for API Access
1. Go to [IAM Console](https://console.aws.amazon.com/iam/)
2. Create a new user with programmatic access
3. Attach this policy (replace `your-bucket-name`):
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::your-bucket-name/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::your-bucket-name"
}
]
}
```
4. Save the Access Key ID and Secret Access Key
### 4. Configure Your Scraper
Update your `config.yaml`:
```yaml
use_s3: true
s3:
aws_access_key_id: "YOUR_ACCESS_KEY_ID"
aws_secret_access_key: "YOUR_SECRET_ACCESS_KEY"
region_name: "us-east-1" # Match your bucket region
bucket_name: "your-bucket-name"
prefix: "google_reviews/"
profiles_folder: "profiles/"
reviews_folder: "reviews/"
delete_local_after_upload: false # Keep local copies
s3_base_url: "" # Leave empty for default AWS URLs
```
### 5. Test Your Setup
Run the included tests to verify everything works:
```bash
# Install dependencies
pip install -r requirements.txt
# Test S3 connection
pytest tests/test_s3_connection.py -v
```
### 6. Folder Structure
Your S3 bucket will organize images like this:
```
your-bucket/
├── google_reviews/
│ ├── profiles/
│ │ ├── user123.jpg
│ │ └── user456.jpg
│ └── reviews/
│ ├── review789.jpg
│ └── review101.jpg
```
### Pro Tips:
- **Cost Optimization**: Enable S3 Intelligent Tiering for automatic cost savings
- **CDN**: Add CloudFront distribution for faster global image delivery
- **Security**: Use IAM roles instead of hardcoded keys in production
- **Monitoring**: Enable S3 access logging to track usage
## 🌐 Links ## 🌐 Links
- [Python Documentation](https://docs.python.org/3/) - [Python Documentation](https://docs.python.org/3/)
- [Selenium Documentation](https://selenium-python.readthedocs.io/) - [Selenium Documentation](https://selenium-python.readthedocs.io/)
- [MongoDB Documentation](https://docs.mongodb.com/) - [MongoDB Documentation](https://docs.mongodb.com/)
- [AWS S3 Documentation](https://docs.aws.amazon.com/s3/)
- [Boto3 Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html)
--- ---

View File

@@ -11,6 +11,8 @@ from urllib.parse import urlparse
import requests import requests
from modules.s3_handler import S3Handler
# Logger # Logger
log = logging.getLogger("scraper") log = logging.getLogger("scraper")
@@ -35,6 +37,10 @@ class ImageHandler:
self.profile_dir = self.image_dir / "profiles" self.profile_dir = self.image_dir / "profiles"
self.review_dir = self.image_dir / "reviews" self.review_dir = self.image_dir / "reviews"
# Initialize S3 handler
self.s3_handler = S3Handler(config)
self.use_s3 = config.get("use_s3", False)
def ensure_directories(self): def ensure_directories(self):
"""Ensure all image directories exist""" """Ensure all image directories exist"""
self.profile_dir.mkdir(parents=True, exist_ok=True) self.profile_dir.mkdir(parents=True, exist_ok=True)
@@ -206,6 +212,31 @@ class ImageHandler:
if custom_url: if custom_url:
url_to_custom_url[url] = custom_url url_to_custom_url[url] = custom_url
# Upload to S3 if enabled
s3_url_mapping = {}
if self.use_s3 and self.s3_handler.enabled and url_to_filename:
log.info("Uploading images to S3...")
# Prepare files for S3 upload
files_to_upload = {}
for url, filename in url_to_filename.items():
# Determine if it's a profile image
is_profile = any(url == profile_url for profile_url in profile_urls)
# Get local file path
local_path = (self.profile_dir if is_profile else self.review_dir) / filename
if local_path.exists():
files_to_upload[filename] = (local_path, is_profile)
# Upload to S3
s3_results = self.s3_handler.upload_images_batch(files_to_upload)
# Create mapping from original URL to S3 URL
for url, filename in url_to_filename.items():
if filename in s3_results:
s3_url_mapping[url] = s3_results[filename]
# Update review documents # Update review documents
for review_id, review in reviews.items(): for review_id, review in reviews.items():
# Find the original URLs to use for lookup - important for both user_images and profile_picture # Find the original URLs to use for lookup - important for both user_images and profile_picture
@@ -241,7 +272,10 @@ class ImageHandler:
# Create custom URLs for each image # Create custom URLs for each image
custom_images = [] custom_images = []
for url in user_images_original: for url in user_images_original:
if url in url_to_custom_url: # Prefer S3 URL if available
if url in s3_url_mapping:
custom_images.append(s3_url_mapping[url])
elif url in url_to_custom_url:
custom_images.append(url_to_custom_url[url]) custom_images.append(url_to_custom_url[url])
elif not self.is_not_custom_url(url): # Already a custom URL elif not self.is_not_custom_url(url): # Already a custom URL
custom_images.append(url) custom_images.append(url)
@@ -262,8 +296,10 @@ class ImageHandler:
if self.preserve_original_urls and "original_profile_picture" not in review: if self.preserve_original_urls and "original_profile_picture" not in review:
review["original_profile_picture"] = review["profile_picture"] review["original_profile_picture"] = review["profile_picture"]
# Replace with custom URL if we have one for this profile image # Replace with S3 URL if available, otherwise use custom URL
if profile_picture_original in url_to_custom_url: if profile_picture_original in s3_url_mapping:
review["profile_picture"] = s3_url_mapping[profile_picture_original]
elif profile_picture_original in url_to_custom_url:
review["profile_picture"] = url_to_custom_url[profile_picture_original] review["profile_picture"] = url_to_custom_url[profile_picture_original]
elif not self.is_not_custom_url(review["profile_picture"]): elif not self.is_not_custom_url(review["profile_picture"]):
# If current URL is already a custom URL, keep it # If current URL is already a custom URL, keep it
@@ -277,7 +313,10 @@ class ImageHandler:
review["profile_picture"] = custom_url review["profile_picture"] = custom_url
log.info(f"Downloaded {len(url_to_filename)} images") log.info(f"Downloaded {len(url_to_filename)} images")
if self.use_s3 and s3_url_mapping:
log.info(f"Uploaded {len(s3_url_mapping)} images to S3")
if self.replace_urls: if self.replace_urls:
log.info(f"Replaced URLs for {len(url_to_custom_url)} images") total_replaced = len(s3_url_mapping) + len(url_to_custom_url)
log.info(f"Replaced URLs for {total_replaced} images")
return reviews return reviews

177
modules/s3_handler.py Normal file
View File

@@ -0,0 +1,177 @@
"""
S3 upload handler for Google Maps Reviews Scraper.
"""
import logging
import os
from pathlib import Path
from typing import Dict, Any, Optional
import boto3
from botocore.exceptions import ClientError
log = logging.getLogger("scraper")
class S3Handler:
"""Handler for uploading images to AWS S3"""
def __init__(self, config: Dict[str, Any]):
"""Initialize S3 handler with configuration"""
self.enabled = config.get("use_s3", False)
if not self.enabled:
return
s3_config = config.get("s3", {})
self.aws_access_key_id = s3_config.get("aws_access_key_id", "")
self.aws_secret_access_key = s3_config.get("aws_secret_access_key", "")
self.region_name = s3_config.get("region_name", "us-east-1")
self.bucket_name = s3_config.get("bucket_name", "")
self.prefix = s3_config.get("prefix", "reviews/").rstrip("/") + "/"
self.profiles_folder = s3_config.get("profiles_folder", "profiles/").strip("/")
self.reviews_folder = s3_config.get("reviews_folder", "reviews/").strip("/")
self.delete_local_after_upload = s3_config.get("delete_local_after_upload", False)
self.s3_base_url = s3_config.get("s3_base_url", "")
# Validate required settings
if not self.bucket_name:
log.error("S3 bucket_name is required when use_s3 is enabled")
self.enabled = False
return
# Initialize S3 client
try:
session_kwargs = {"region_name": self.region_name}
# Use credentials if provided, otherwise rely on environment/IAM
if self.aws_access_key_id and self.aws_secret_access_key:
session_kwargs.update({
"aws_access_key_id": self.aws_access_key_id,
"aws_secret_access_key": self.aws_secret_access_key
})
self.s3_client = boto3.client("s3", **session_kwargs)
# Test connection by checking if bucket exists
self.s3_client.head_bucket(Bucket=self.bucket_name)
log.info(f"S3 handler initialized successfully for bucket: {self.bucket_name}")
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == '404':
log.error(f"S3 bucket '{self.bucket_name}' not found")
elif error_code == '403':
log.error(f"Access denied to S3 bucket '{self.bucket_name}'")
else:
log.error(f"Error connecting to S3: {e}")
self.enabled = False
except Exception as e:
log.error(f"Error initializing S3 client: {e}")
self.enabled = False
def get_s3_url(self, key: str) -> str:
"""Generate S3 URL for uploaded file"""
if self.s3_base_url:
return f"{self.s3_base_url.rstrip('/')}/{key}"
else:
return f"https://{self.bucket_name}.s3.{self.region_name}.amazonaws.com/{key}"
def upload_file(self, local_path: Path, s3_key: str) -> Optional[str]:
"""
Upload a file to S3.
Args:
local_path: Path to local file
s3_key: S3 key (path) for the uploaded file
Returns:
S3 URL if successful, None if failed
"""
if not self.enabled:
return None
if not local_path.exists():
log.warning(f"Local file does not exist: {local_path}")
return None
try:
# Upload file
self.s3_client.upload_file(
str(local_path),
self.bucket_name,
s3_key,
ExtraArgs={
'ContentType': 'image/jpeg',
'ACL': 'public-read' # Make images publicly readable
}
)
# Generate S3 URL
s3_url = self.get_s3_url(s3_key)
# Delete local file if requested
if self.delete_local_after_upload:
try:
local_path.unlink()
log.debug(f"Deleted local file: {local_path}")
except Exception as e:
log.warning(f"Failed to delete local file {local_path}: {e}")
log.debug(f"Uploaded {local_path} to s3://{self.bucket_name}/{s3_key}")
return s3_url
except ClientError as e:
log.error(f"Failed to upload {local_path} to S3: {e}")
return None
except Exception as e:
log.error(f"Unexpected error uploading {local_path} to S3: {e}")
return None
def upload_image(self, local_path: Path, filename: str, is_profile: bool = False) -> Optional[str]:
"""
Upload an image to S3 with appropriate folder structure.
Args:
local_path: Path to local image file
filename: Name of the file
is_profile: Whether this is a profile image
Returns:
S3 URL if successful, None if failed
"""
if not self.enabled:
return None
# Create S3 key with appropriate folder structure
folder = self.profiles_folder if is_profile else self.reviews_folder
s3_key = f"{self.prefix}{folder}/{filename}"
return self.upload_file(local_path, s3_key)
def upload_images_batch(self, image_files: Dict[str, tuple]) -> Dict[str, str]:
"""
Upload multiple images to S3.
Args:
image_files: Dict mapping filename to (local_path, is_profile) tuple
Returns:
Dict mapping filename to S3 URL for successful uploads
"""
if not self.enabled:
return {}
results = {}
for filename, (local_path, is_profile) in image_files.items():
s3_url = self.upload_image(local_path, filename, is_profile)
if s3_url:
results[filename] = s3_url
if results:
log.info(f"Successfully uploaded {len(results)} images to S3")
return results

View File

@@ -10,3 +10,5 @@ pyyaml==6.0.1
certifi==2024.7.4 certifi==2024.7.4
webdriver-manager==4.0.2 webdriver-manager==4.0.2
setuptools==79.0.1 setuptools==79.0.1
boto3==1.35.1
pytest==7.4.3

54
tests/README.md Normal file
View File

@@ -0,0 +1,54 @@
# Tests
This directory contains pytest tests for the Google Reviews Scraper.
## Running Tests
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run all tests:
```bash
pytest tests/
```
3. Run specific test files:
```bash
pytest tests/test_mongodb_connection.py
pytest tests/test_s3_connection.py
```
4. Run with verbose output:
```bash
pytest tests/ -v
```
## Test Coverage
### MongoDB Connection Tests (`test_mongodb_connection.py`)
- Tests MongoDB connection when enabled in config
- Validates MongoDB configuration parameters
- Tests basic database operations (insert/find/delete)
- Skips tests when MongoDB is disabled
### S3 Connection Tests (`test_s3_connection.py`)
- Tests S3 connection when enabled in config
- Validates S3 configuration parameters
- Tests file upload/download operations
- Tests S3Handler class initialization
- Skips tests when S3 is disabled
## Configuration
Tests use the main `config.yaml` file in the project root. Make sure your configuration is properly set up:
- For MongoDB tests: Ensure `use_mongodb: true` and valid MongoDB credentials
- For S3 tests: Ensure `use_s3: true` and valid AWS credentials
## Test Results
- Tests will be skipped if the corresponding service (MongoDB/S3) is disabled in config
- Failed connection tests indicate configuration or service availability issues
- All tests should pass when services are properly configured and accessible

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package

39
tests/conftest.py Normal file
View File

@@ -0,0 +1,39 @@
"""
Test configuration and fixtures for Google Reviews Scraper tests.
"""
import pytest
import yaml
from pathlib import Path
@pytest.fixture
def config():
"""Load configuration from config.yaml"""
config_path = Path(__file__).parent.parent / "config.yaml"
with open(config_path, 'r') as f:
return yaml.safe_load(f)
@pytest.fixture
def mongodb_config(config):
"""Extract MongoDB configuration"""
return config.get("mongodb", {})
@pytest.fixture
def s3_config(config):
"""Extract S3 configuration"""
return config.get("s3", {})
@pytest.fixture
def use_mongodb(config):
"""Check if MongoDB is enabled"""
return config.get("use_mongodb", False)
@pytest.fixture
def use_s3(config):
"""Check if S3 is enabled"""
return config.get("use_s3", False)

View File

@@ -0,0 +1,90 @@
"""
Test MongoDB connection functionality.
"""
import pytest
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
class TestMongoDBConnection:
"""Test MongoDB connection and basic operations"""
def test_mongodb_connection_when_enabled(self, use_mongodb, mongodb_config):
"""Test MongoDB connection when MongoDB is enabled in config"""
if not use_mongodb:
pytest.skip("MongoDB is disabled in configuration")
if not mongodb_config:
pytest.fail("MongoDB is enabled but no MongoDB configuration found")
uri = mongodb_config.get("uri")
if not uri:
pytest.fail("MongoDB URI not found in configuration")
try:
# Create MongoDB client with shorter timeout for testing
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
# Test connection by pinging the server
client.admin.command('ping')
# Test database access
database_name = mongodb_config.get("database", "reviews")
db = client[database_name]
# Test collection access
collection_name = mongodb_config.get("collection", "google_reviews")
collection = db[collection_name]
# Verify we can perform basic operations
# Test insert and delete a dummy document
test_doc = {"_id": "test_connection", "test": True}
collection.insert_one(test_doc)
# Verify document was inserted
found_doc = collection.find_one({"_id": "test_connection"})
assert found_doc is not None
assert found_doc["test"] is True
# Clean up test document
collection.delete_one({"_id": "test_connection"})
# Verify document was deleted
found_doc = collection.find_one({"_id": "test_connection"})
assert found_doc is None
client.close()
except ConnectionFailure as e:
pytest.fail(f"Failed to connect to MongoDB: {e}")
except ServerSelectionTimeoutError as e:
pytest.fail(f"MongoDB server selection timeout: {e}")
except Exception as e:
pytest.fail(f"Unexpected error testing MongoDB: {e}")
def test_mongodb_config_validation(self, use_mongodb, mongodb_config):
"""Test that MongoDB configuration is valid when enabled"""
if not use_mongodb:
pytest.skip("MongoDB is disabled in configuration")
# Check required configuration fields
assert "uri" in mongodb_config, "MongoDB URI is required"
assert "database" in mongodb_config, "MongoDB database name is required"
assert "collection" in mongodb_config, "MongoDB collection name is required"
# Validate URI format
uri = mongodb_config["uri"]
assert uri.startswith("mongodb://") or uri.startswith("mongodb+srv://"), "Invalid MongoDB URI format"
# Validate names are not empty
assert mongodb_config["database"].strip(), "Database name cannot be empty"
assert mongodb_config["collection"].strip(), "Collection name cannot be empty"
def test_mongodb_skipped_when_disabled(self, use_mongodb):
"""Test that MongoDB tests are skipped when disabled"""
if use_mongodb:
pytest.skip("MongoDB is enabled, this test is for disabled state")
# This test passes if we reach here, meaning MongoDB is properly disabled
assert True

202
tests/test_s3_connection.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Test S3 connection functionality.
"""
import pytest
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from pathlib import Path
import tempfile
import os
class TestS3Connection:
"""Test S3 connection and basic operations"""
def test_s3_connection_when_enabled(self, use_s3, s3_config):
"""Test S3 connection when S3 is enabled in config"""
if not use_s3:
pytest.skip("S3 is disabled in configuration")
if not s3_config:
pytest.fail("S3 is enabled but no S3 configuration found")
# Validate required configuration
bucket_name = s3_config.get("bucket_name")
if not bucket_name:
pytest.fail("S3 bucket name not found in configuration")
region_name = s3_config.get("region_name", "us-east-1")
try:
# Create S3 client with credentials from config
session_kwargs = {"region_name": region_name}
aws_access_key_id = s3_config.get("aws_access_key_id")
aws_secret_access_key = s3_config.get("aws_secret_access_key")
if aws_access_key_id and aws_secret_access_key:
session_kwargs.update({
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key
})
s3_client = boto3.client("s3", **session_kwargs)
# Test bucket access by checking if bucket exists
s3_client.head_bucket(Bucket=bucket_name)
except NoCredentialsError:
pytest.fail("AWS credentials not found. Check your configuration or environment.")
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == '404':
pytest.fail(f"S3 bucket '{bucket_name}' not found")
elif error_code == '403':
pytest.fail(f"Access denied to S3 bucket '{bucket_name}'. Check your credentials and permissions.")
else:
pytest.fail(f"S3 client error: {e}")
except Exception as e:
pytest.fail(f"Unexpected error testing S3 connection: {e}")
def test_s3_upload_download_when_enabled(self, use_s3, s3_config):
"""Test S3 upload and download functionality"""
if not use_s3:
pytest.skip("S3 is disabled in configuration")
if not s3_config:
pytest.fail("S3 is enabled but no S3 configuration found")
bucket_name = s3_config.get("bucket_name")
if not bucket_name:
pytest.fail("S3 bucket name not found in configuration")
region_name = s3_config.get("region_name", "us-east-1")
prefix = s3_config.get("prefix", "reviews/").rstrip("/") + "/"
profiles_folder = s3_config.get("profiles_folder", "profiles/").strip("/")
reviews_folder = s3_config.get("reviews_folder", "reviews/").strip("/")
try:
# Create S3 client
session_kwargs = {"region_name": region_name}
aws_access_key_id = s3_config.get("aws_access_key_id")
aws_secret_access_key = s3_config.get("aws_secret_access_key")
if aws_access_key_id and aws_secret_access_key:
session_kwargs.update({
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key
})
s3_client = boto3.client("s3", **session_kwargs)
# Create a temporary test file
test_content = b"This is a test file for S3 upload"
# Test with reviews folder structure
test_key = f"{prefix}{reviews_folder}/test_file.txt"
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(test_content)
tmp_file_path = tmp_file.name
try:
# Test upload
s3_client.upload_file(
tmp_file_path,
bucket_name,
test_key,
ExtraArgs={'ACL': 'public-read'}
)
# Test that file exists in S3
s3_client.head_object(Bucket=bucket_name, Key=test_key)
# Test download
with tempfile.NamedTemporaryFile(delete=False) as download_file:
download_path = download_file.name
s3_client.download_file(bucket_name, test_key, download_path)
# Verify downloaded content matches uploaded content
with open(download_path, 'rb') as f:
downloaded_content = f.read()
assert downloaded_content == test_content, "Downloaded content doesn't match uploaded content"
# Clean up S3 object
s3_client.delete_object(Bucket=bucket_name, Key=test_key)
finally:
# Clean up temporary files
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
if os.path.exists(download_path):
os.unlink(download_path)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == '403':
pytest.fail(f"Access denied during S3 operations. Check your permissions.")
else:
pytest.fail(f"S3 operation failed: {e}")
except Exception as e:
pytest.fail(f"Unexpected error during S3 test: {e}")
def test_s3_config_validation(self, use_s3, s3_config):
"""Test that S3 configuration is valid when enabled"""
if not use_s3:
pytest.skip("S3 is disabled in configuration")
# Check required configuration fields
assert "bucket_name" in s3_config, "S3 bucket_name is required"
assert s3_config["bucket_name"].strip(), "S3 bucket_name cannot be empty"
# Check optional fields have reasonable defaults
region_name = s3_config.get("region_name", "us-east-1")
assert region_name.strip(), "S3 region_name cannot be empty"
# Validate prefix format if provided
prefix = s3_config.get("prefix", "")
if prefix and not prefix.endswith("/"):
# This is not an error, but log a warning that prefix should end with "/"
pass
def test_s3_skipped_when_disabled(self, use_s3):
"""Test that S3 tests are skipped when disabled"""
if use_s3:
pytest.skip("S3 is enabled, this test is for disabled state")
# This test passes if we reach here, meaning S3 is properly disabled
assert True
def test_s3_handler_initialization(self, config):
"""Test S3Handler class initialization with current config"""
try:
# Import the S3Handler class
import sys
sys.path.append(str(Path(__file__).parent.parent))
from modules.s3_handler import S3Handler
# Test initialization
s3_handler = S3Handler(config)
# Check that handler respects the use_s3 setting
expected_enabled = config.get("use_s3", False)
assert s3_handler.enabled == expected_enabled, f"S3Handler enabled state should match config use_s3 setting"
if expected_enabled:
# If S3 is enabled, check that configuration was loaded
s3_config = config.get("s3", {})
bucket_name = s3_config.get("bucket_name", "")
if bucket_name:
assert s3_handler.bucket_name == bucket_name, "S3Handler should load bucket name from config"
else:
# If no bucket name, handler should be disabled
assert not s3_handler.enabled, "S3Handler should be disabled when bucket_name is missing"
except ImportError:
pytest.fail("Could not import S3Handler class")
except Exception as e:
pytest.fail(f"Error testing S3Handler initialization: {e}")