Migrating JSONB Extraction from Python to dbt
Overview
This guide shows how to replace scripts/datasources/gemini/load_meeting_transcripts_bronze.py with dbt incremental models.
Why Migrate to dbt?
Current approach (Python):
- ❌ Separate tool/language from main transformation pipeline
- ❌ Manual UPSERT logic with psycopg2
- ❌ Harder to track lineage
- ❌ No built-in testing framework
dbt approach:
- ✅ Single tool for all transformations
- ✅ SQL-based JSONB extraction (simpler to maintain)
- ✅ Incremental processing built-in
- ✅ Automatic lineage tracking
- ✅ Built-in testing and documentation
- ✅ Version control friendly
What CAN Be Migrated
✅ load_meeting_transcripts_bronze.py - JSONB extraction to bronze tables
- bronze_contacts
- bronze_decisions
- bronze_topics
- bronze_organizations_meetings
- bronze_bills
- bronze_causes
- bronze_financial_items
What CANNOT Be Migrated
❌ load_meeting_transcripts.py - Calls Gemini API (data loading, not transformation)
❌ check_models_used.py - Reporting script
❌ analyze_with_multi_models.py - Calls Gemini API
❌ migrations/cleanup_null_records.py - One-time migration
Migration Steps
1. Create dbt Bronze Models
Created example models (see models/bronze/):
bronze_contacts_from_ai.sql- Extract people from JSONBbronze_decisions_from_ai.sql- Extract decisions from JSONB
Pattern for other tables:
{{
config(
materialized='incremental',
unique_key='source_event_id_<entity>_id',
schema='bronze',
tags=['bronze', 'incremental', 'ai-extraction']
)
}}
WITH source_events AS (
SELECT
id as event_id,
structured_analysis,
ai_model,
created_at
FROM {{ source('bronze', 'bronze_events_analysis_ai') }}
WHERE structured_analysis IS NOT NULL
{% if is_incremental() %}
AND created_at > (SELECT MAX(extracted_at) FROM {{ this }})
{% endif %}
),
entities_unnested AS (
SELECT
event_id,
ai_model,
jsonb_array_elements(structured_analysis->'<entity_key>') as entity_data,
created_at as extracted_at
FROM source_events
WHERE structured_analysis ? '<entity_key>'
)
SELECT
-- Extract fields from JSONB...
FROM entities_unnested
2. Update Staging Models
OLD (reads from bronze tables created by Python):
SELECT * FROM {{ source('bronze', 'bronze_contacts') }}
NEW (reads from dbt bronze models):
SELECT * FROM {{ ref('bronze_contacts_from_ai') }}
3. Update Workflow
OLD:
# Step 1: Run Gemini API analysis
python scripts/datasources/gemini/load_meeting_transcripts.py
# Step 2: Extract JSONB to bronze tables (Python)
python scripts/datasources/gemini/load_meeting_transcripts_bronze.py
# Step 3: Transform with dbt
cd dbt_project
dbt run --select stg_bronze_contacts+
NEW:
# Step 1: Run Gemini API analysis (still Python)
python scripts/datasources/gemini/load_meeting_transcripts.py
# Step 2: Extract JSONB with dbt (incremental)
cd dbt_project
dbt run --select bronze_contacts_from_ai+
# Or run all bronze extractions:
dbt run --select tag:ai-extraction+
4. First-Time Setup
Initial load (one time):
# Let Python script create initial bronze tables
python scripts/datasources/gemini/load_meeting_transcripts_bronze.py
# OR let dbt create them fresh:
cd dbt_project
dbt run --select tag:ai-extraction --full-refresh
Going forward (incremental):
cd dbt_project
dbt run --select tag:ai-extraction
# Only processes new records added since last run
JSONB Extraction Patterns
Simple Array Extraction
-- Extract people array
jsonb_array_elements(structured_analysis->'people') as person_data
-- Get fields
person_data->>'full_name' as full_name -- Text
person_data->>'is_lobbyist' as is_lobbyist -- Still text, cast later
(person_data->>'is_lobbyist')::boolean -- Cast to boolean
Nested JSONB (Keep as JSONB)
-- Keep complex structures as JSONB
person_data->'lobbyist_clients' as lobbyist_clients, -- JSONB
decision_data->'vote_tally' as vote_tally, -- JSONB
decision_data->'frame_analysis' as frame_analysis -- JSONB
Date/Numeric Casting
(decision_data->>'decision_date')::date as decision_date,
(decision_data->>'year')::integer as year,
(financial_data->>'amount')::numeric as amount
Handling NULL/Missing Fields
-- Use COALESCE for defaults
COALESCE((person_data->>'is_lobbyist')::boolean, FALSE) as is_lobbyist,
-- Check if key exists
WHERE structured_analysis ? 'people'
Incremental Model Strategy
dbt will:
- Check if target table exists
- If first run → create full table
- If subsequent run → only process new source records
- Use
unique_keyto deduplicate (UPSERT behavior)
{% if is_incremental() %}
AND created_at > (SELECT MAX(extracted_at) FROM {{ this }})
{% endif %}
Testing
Add tests in _bronze.yml:
models:
- name: bronze_contacts_from_ai
description: "Contacts extracted from Gemini AI analysis JSONB"
tests:
- dbt_utils.recency:
datepart: day
field: extracted_at
interval: 7
columns:
- name: person_id
tests:
- not_null
- name: full_name
tests:
- not_null
Performance
Python script:
- Loads entire events_text_ai table into memory
- Processes all rows every run
- Manual batching required
dbt incremental:
- Only processes new records
- SQL-based (database does the work)
- No memory constraints
- Parallel execution possible
Rollback Plan
If needed, can run both in parallel:
# Old way (Python)
python scripts/datasources/gemini/load_meeting_transcripts_bronze.py \
--table bronze_contacts_python
# New way (dbt)
dbt run --select bronze_contacts_from_ai
# Compare results
psql -c "SELECT COUNT(*) FROM bronze.bronze_contacts_python"
psql -c "SELECT COUNT(*) FROM bronze.bronze_contacts_from_ai"
Next Steps
- ✅ Review example models (
bronze_contacts_from_ai.sql,bronze_decisions_from_ai.sql) - Create remaining bronze models:
bronze_topics_from_ai.sqlbronze_organizations_meetings_from_ai.sqlbronze_bills_from_ai.sqlbronze_causes_from_ai.sqlbronze_financial_items_from_ai.sql
- Update staging models to reference new dbt models
- Test incremental runs
- Deprecate Python extraction script
- Update documentation
Questions?
- How do I handle deduplication? → Use
unique_keyconfig - What if JSONB structure changes? → Update the model SQL
- Can I still use Python for initial load? → Yes, for backward compatibility
- How do I backfill? → Use
dbt run --select <model> --full-refresh