Skip to content

Parallelization Reference

sqldeps.parallel

Parallel processing utilities for SQL dependency extraction.

This module provides functions for extracting SQL dependencies in parallel using multiple worker processes, with shared rate limiting.

process_files_in_parallel(sql_files, framework='groq', model=None, prompt_path=None, n_workers=1, rpm=100, use_cache=True)

Extract SQL dependencies from SQL files in parallel with rate limiting.

Parameters:

Name Type Description Default
sql_files list[Path]

List of Paths to SQL files to process

required
framework str

LLM framework to use (e.g., groq, openai, deepseek)

'groq'
model str | None

Model name within the selected framework

None
prompt_path Path | None

Path to custom prompt YAML file

None
n_workers int

Number of worker processes to use (-1 for all)

1
rpm int

Requests per minute limit across all workers

100
use_cache bool

Whether to use cached results

True

Returns:

Type Description
dict

Dictionary mapping file paths to SQLProfile objects

Raises:

Type Description
ValueError

If no SQL files provided or no dependencies extracted

Source code in sqldeps/parallel.py
def process_files_in_parallel(
    sql_files: list[Path],
    framework: str = "groq",
    model: str | None = None,
    prompt_path: Path | None = None,
    n_workers: int = 1,
    rpm: int = 100,
    use_cache: bool = True,
) -> dict:
    """Extract SQL dependencies from SQL files in parallel with rate limiting.

    Args:
        sql_files: List of Paths to SQL files to process
        framework: LLM framework to use (e.g., groq, openai, deepseek)
        model: Model name within the selected framework
        prompt_path: Path to custom prompt YAML file
        n_workers: Number of worker processes to use (-1 for all)
        rpm: Requests per minute limit across all workers
        use_cache: Whether to use cached results

    Returns:
        Dictionary mapping file paths to SQLProfile objects

    Raises:
        ValueError: If no SQL files provided or no dependencies extracted
    """
    # Resolve number of workers
    n_workers = resolve_workers(n_workers)

    # Ensure we have a list of Path objects
    sql_files = [Path(f) for f in sql_files]

    if not sql_files:
        raise ValueError("No SQL files provided")

    logger.info(f"Processing {len(sql_files)} SQL files")
    logger.info(
        f"Using {n_workers} workers with global rate limit of {rpm} requests per minute"
    )
    logger.info(f"Cache usage: {'enabled' if use_cache else 'disabled'}")

    # Calculate optimal number of workers (don't use more workers than files)
    n_workers = min(n_workers, len(sql_files))

    # Split files into batches
    batches = np.array_split(sql_files, n_workers)
    batches = [list(batch) for batch in batches if len(batch) > 0]

    all_results = {}

    # Create shared rate limiter
    with Manager() as manager:
        rate_limiter = MultiprocessingRateLimiter(manager, rpm)

        # Process batches in parallel
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            process_func = partial(
                _process_batch_files,
                rate_limiter=rate_limiter,
                framework=framework,
                model=model,
                prompt_path=prompt_path,
                use_cache=use_cache,
            )

            futures = {
                executor.submit(process_func, batch): i
                for i, batch in enumerate(batches)
            }

            for future in as_completed(futures):
                batch_idx = futures[future]
                try:
                    batch_results = future.result()
                    all_results.update(batch_results)
                    logger.info(
                        f"Completed batch {batch_idx + 1}/{len(batches)} with "
                        f"{len(batch_results)} results"
                    )
                except Exception as e:
                    logger.error(f"Batch {batch_idx + 1} failed: {e}")

    # If no results were extracted
    if not all_results:
        raise ValueError("No dependencies could be extracted from any SQL file")

    return all_results

resolve_workers(n_workers)

Resolve the number of worker processes to use.

Parameters:

Name Type Description Default
n_workers int

Requested number of workers (-1 for all, >0 for specific count)

required

Returns:

Name Type Description
int int

Actual number of worker processes to use

Raises:

Type Description
ValueError

If n_workers is invalid (not -1, or not between 1 and cpu_count)

Source code in sqldeps/parallel.py
def resolve_workers(n_workers: int) -> int:
    """Resolve the number of worker processes to use.

    Args:
        n_workers: Requested number of workers (-1 for all, >0 for specific count)

    Returns:
        int: Actual number of worker processes to use

    Raises:
        ValueError: If n_workers is invalid (not -1, or not between 1 and cpu_count)
    """
    max_workers = cpu_count()

    if n_workers == -1:
        return max_workers
    if 1 <= n_workers <= max_workers:
        return n_workers

    raise ValueError(
        f"Invalid worker count: {n_workers}. "
        f"Must be -1 (all), 1 (single), or up to {max_workers}."
    )