import base64
import csv
import io
import random
import string
from io import StringIO
from pathlib import Path
from typing import Type, TypeVar

import pandas as pd
import polars as pl
import shortuuid
from fastapi import HTTPException, status
from fastapi.responses import StreamingResponse
from slugify import slugify
from sqlalchemy.orm import Session

# from src.apps.site.models.site_settings import SiteSetting
from src.core.models.base import Base
from src.utils.enums import Separator
from src.utils.helpers.auth import pwd_context

ModelType = TypeVar("ModelType", bound=Base)


def generate_secure_id(
    db: Session,
    instance: Type[ModelType],
    length: int = 10,
    prepend: str = "",
    separator: str = "_",
    new_secure_id: str | None = None,
) -> str:
    """Function to return a random cryptogrqaphically secure id

    @Params:
    prepend: string - Anything that needs to be prepended to the id
    length: int - Desired length of id to generate
    separator: string - A character set to join the prepended text with random id

    @Output:
    string
    """
    if new_secure_id is not None:
        secure_id = new_secure_id
    else:
        secure_id = separator.join(
            [
                prepend.lower(),
                shortuuid.ShortUUID().random(length=(length - (len(prepend) + len(separator)))),
            ]
        )

    qs_exists = db.query(instance).filter(instance.session_id == secure_id).first()
    if qs_exists:
        # Create a new slug with a random suffix
        random_suffix = random_string_generator(size=4)
        new_slug = f"{secure_id}-{random_suffix}"
        return generate_secure_id(
            db=db, instance=instance, length=length, prepend=prepend, separator=separator, new_slug=new_slug
        )

    return secure_id


def random_string_generator(size: int = 5) -> str:
    """
    Generate a cryptographically secure random string of specified length.

    Args:
        size: Length of the random string to generate. Defaults to 5.

    Returns:
        A random string containing uppercase letters and digits.
    """
    return "".join(random.choices(string.ascii_uppercase + string.digits, k=size))


def generate_unique_slug(
    db: Session, instance: Type[ModelType], slug_label: str | None = None, new_slug: str | None = None
) -> str:
    """
    Generate a unique slug for database models with slug fields.

    This function is designed for FastAPI/SQLAlchemy projects and ensures
    slug uniqueness by appending a random string when collisions occur.

    Args:
        db: SQLAlchemy database session.
        instance: The model class to query against.
        slug_label: Text to be converted into a slug. Required if new_slug is None.
        new_slug: Pre-generated slug to check for uniqueness. Optional.

    Returns:
        A unique slug string safe for database storage.

    Raises:
        ValueError: If both slug_label and new_slug are None.
    """
    if new_slug is not None:
        slug = new_slug
    elif slug_label is not None:
        slug = slugify(slug_label)
    else:
        raise ValueError("Either slug_label or new_slug must be provided")

    # Check if the slug already exists in the database
    qs_exists = db.query(instance).filter(instance.slug == slug).first()
    if qs_exists:
        # Create a new slug with a random suffix
        random_suffix = random_string_generator(size=4)
        new_slug = f"{slug}-{random_suffix}"
        return generate_unique_slug(db=db, instance=instance, new_slug=new_slug, slug_label=None)

    return slug


def get_password_hash(password):
    return pwd_context.hash(password)


def generate_unique_code() -> str:
    """Generate a unique code consisting of 4 uppercase letters followed by 8 digits."""
    letters = "".join(random.choices(string.ascii_uppercase, k=4))
    digits = "".join(random.choices(string.digits, k=6))
    slug = letters + digits

    return slug


def get_site_settings_value(db: Session, key: str, default=None):
    """Get a setting value from the settings object."""
    TOKEN_VALUE = default
    # site_setting = db.query(SiteSetting).filter(SiteSetting.key == key).first()
    # if site_setting and site_setting.value:
    #     try:
    #         TOKEN_VALUE = site_setting.value
    #     except ValueError:
    #         pass
    return TOKEN_VALUE


async def generate_csv(columns, data):
    """Generate a CSV file with headers using pandas and return it."""
    # Convert data (list of objects) to list of dicts
    dict_data = [{col: getattr(row, col, "") for col in columns} for row in data]
    df = pd.DataFrame(dict_data, columns=columns)
    output = StringIO()
    df.to_csv(output, index=False)
    output.seek(0)
    return output


def truncate_to_words(text, max_words=120):
    words = text.split()
    return " ".join(words[:max_words])


def get_file_header(file_path: str, custom_delimiter: str = None, rows: int = 0) -> list:
    ext = Path(file_path).suffix.lower()

    # try:
    custom_delimiter = "\t"  # Separator[custom_delimiter].value
    print(f"Using custom delimiter: {custom_delimiter}")
    try:
        read_csv_kwargs = dict(
            n_rows=rows,
            infer_schema_length=10000,  # Increase to better infer types
            encoding='utf-8-lossy',
            ignore_errors=True,  # Ignore row errors
            null_values=["", "NV"]  # Treat 'NV' as null
        )
        if ext == ".parquet":
            df = pl.read_parquet(file_path, n_rows=rows)
        elif ext == ".json":
            df = pl.read_json(file_path, infer_schema_length=rows)
        elif ext == ".ipc":
            df = pl.read_ipc(file_path, n_rows=rows)
        elif ext in [".xlsx", ".xls"]:
            df = pl.read_excel(file_path, n_rows=rows)
        elif ext == ".tsv":
            df = pl.read_csv(file_path, separator="\t", **read_csv_kwargs)
        elif ext == ".txt":
            df = pl.read_csv(file_path, separator=custom_delimiter, **read_csv_kwargs)
        else:
            df = pl.read_csv(file_path, **read_csv_kwargs)
    except Exception as e:
        # Try with different encoding if first attempt fails
        if ext in [".csv", ".tsv", ".txt"]:
            try:
                df = pl.read_csv(
                    file_path,
                    separator="\t" if ext == ".tsv" else custom_delimiter if ext == ".txt" else ",",
                    n_rows=rows,
                    infer_schema_length=10000,
                    encoding='latin-1',
                    ignore_errors=True,
                    null_values=["", "NV"]
                )
            except Exception as e2:
                raise ValueError(f"Could not read header from file: {file_path}\nError: {e2}")
        else:
            raise ValueError(f"Could not read header from file: {file_path}\nError: {e}")
    if rows > 0:
        header = df.columns
        objects = df.rows()
        result = [header] + [list(row) for row in objects]
        return result
    return df.columns
    # except Exception as e:
    #     raise ValueError(f"Could not read header from file: {file_path}\nError: {e}")


def cleanup(text: str) -> str:
    # Example cleanup: strip extra whitespace and replace double spaces
    return " ".join(text.strip().split())


def join_pattern(wine_keyword) -> str | None:
    if not wine_keyword:
        return None

    parts = []
    STR_AND = " AND "

    if getattr(wine_keyword, "producer_keyword", None):
        parts.append(wine_keyword.producer_keyword.strip())

    if getattr(wine_keyword, "base_keyword", None):
        parts.append(STR_AND + wine_keyword.base_keyword.strip())

    if getattr(wine_keyword, "appellation_keyword", None):
        parts.append(STR_AND + wine_keyword.appellation_keyword.strip())

    if getattr(wine_keyword, "color_keyword", None):
        parts.append(wine_keyword.color_keyword.strip())

    if getattr(wine_keyword, "common_modifiers_keyword", None):
        val = wine_keyword.common_modifiers_keyword.strip()
        parts.append(val)
        parts.append(val)

    if getattr(wine_keyword, "modifiers_keyword", None):
        val = wine_keyword.modifiers_keyword.strip()
        parts.extend([val] * 6)

    if getattr(wine_keyword, "custom_not_keyword", None):
        parts.append(wine_keyword.custom_not_keyword.strip())

    if getattr(wine_keyword, "global_not_keyword", None):
        parts.append(wine_keyword.global_not_keyword.strip())

    return cleanup("".join(parts))
