La mayoría de los problemas de integración en empresas no se resuelven con un conector pre-construido. Tu ERP tiene una estructura de datos específica, tu e-commerce maneja su propio formato de pedidos y tu CRM almacena clientes de forma diferente. Lo que necesitas es una API que traduzca entre ellos — y FastAPI es la herramienta perfecta para construirla.
En esta guía vas a crear una API REST profesional con Python y FastAPI que incluye validación de datos, autenticación JWT, conexión a PostgreSQL, manejo de errores y documentación automática — lista para producción.
Requisitos previos
Python 3.11+ y PostgreSQL instalados. Si necesitas instalar PostgreSQL, sigue nuestra guía de instalación.
python3 --version # 3.11+
psql --version # 16+
Paso 1: Estructura del proyecto
mi-api/
├── app/
│ ├── __init__.py
│ ├── main.py # Punto de entrada de la API
│ ├── config.py # Configuración y variables de entorno
│ ├── database.py # Conexión a PostgreSQL
│ ├── auth.py # Autenticación JWT
│ ├── models/
│ │ ├── __init__.py
│ │ └── producto.py # Modelo SQLAlchemy
│ ├── schemas/
│ │ ├── __init__.py
│ │ └── producto.py # Schemas Pydantic (validación)
│ └── routers/
│ ├── __init__.py
│ └── productos.py # Endpoints de productos
├── .env
├── requirements.txt
└── Dockerfile
Crea el entorno virtual e instala dependencias:
mkdir mi-api && cd mi-api
python3 -m venv venv
source venv/bin/activate
pip install fastapi uvicorn[standard] sqlalchemy[asyncio] asyncpg \
pydantic-settings python-jose[cryptography] passlib[bcrypt] python-multipart
Guarda las dependencias:
pip freeze > requirements.txt
Paso 2: Configuración
# app/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
app_name: str = "Mi API"
database_url: str = "postgresql+asyncpg://app:password@localhost:5432/miapi"
secret_key: str = "cambiar-en-produccion-con-valor-seguro"
access_token_expire_minutes: int = 60
class Config:
env_file = ".env"
settings = Settings()
# .env
DATABASE_URL=postgresql+asyncpg://app:tu_contraseña@localhost:5432/miapi
SECRET_KEY=genera-un-valor-aleatorio-aqui-de-32-chars
ACCESS_TOKEN_EXPIRE_MINUTES=60
Paso 3: Conexión a PostgreSQL
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
engine = create_async_engine(settings.database_url, echo=False)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
"""Dependency que provee una sesión de DB a cada request."""
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Paso 4: Modelo y schema
El modelo define la estructura en la base de datos. El schema define la estructura de la API (lo que el cliente envía y recibe):
# app/models/producto.py
from sqlalchemy import String, Numeric, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class Producto(Base):
__tablename__ = "productos"
id: Mapped[int] = mapped_column(primary_key=True)
sku: Mapped[str] = mapped_column(String(50), unique=True, index=True)
nombre: Mapped[str] = mapped_column(String(200))
descripcion: Mapped[str | None] = mapped_column(String(1000))
precio: Mapped[float] = mapped_column(Numeric(12, 2))
stock: Mapped[int] = mapped_column(default=0)
activo: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[str] = mapped_column(DateTime, server_default=func.now())
updated_at: Mapped[str] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now())
# app/schemas/producto.py
from pydantic import BaseModel, Field
from datetime import datetime
class ProductoBase(BaseModel):
sku: str = Field(..., min_length=1, max_length=50, examples=["SKU-001"])
nombre: str = Field(..., min_length=1, max_length=200)
descripcion: str | None = None
precio: float = Field(..., gt=0, examples=[299.99])
stock: int = Field(default=0, ge=0)
activo: bool = True
class ProductoCreate(ProductoBase):
pass
class ProductoUpdate(BaseModel):
nombre: str | None = None
descripcion: str | None = None
precio: float | None = Field(default=None, gt=0)
stock: int | None = Field(default=None, ge=0)
activo: bool | None = None
class ProductoResponse(ProductoBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PaginatedResponse(BaseModel):
items: list[ProductoResponse]
total: int
page: int
per_page: int
pages: int
¿Por qué separar modelo y schema?
El modelo define cómo se almacenan los datos en PostgreSQL. El schema define cómo la API recibe y devuelve datos. Esto te permite exponer solo los campos que el cliente necesita, validar inputs con reglas de negocio (precio positivo, SKU único) y tener diferentes schemas para crear, actualizar y responder.
Paso 5: Autenticación JWT
# app/auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.config import settings
security = HTTPBearer()
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.secret_key, algorithm="HS256")
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
token = credentials.credentials
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado",
)
Paso 6: Endpoints CRUD
# app/routers/productos.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.database import get_db
from app.auth import verify_token
from app.models.producto import Producto
from app.schemas.producto import (
ProductoCreate, ProductoUpdate, ProductoResponse, PaginatedResponse
)
router = APIRouter(prefix="/api/v1/productos", tags=["Productos"])
@router.get("", response_model=PaginatedResponse)
async def listar_productos(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
activo: bool | None = None,
buscar: str | None = None,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
):
"""Listar productos con paginación, filtros y búsqueda."""
query = select(Producto)
if activo is not None:
query = query.where(Producto.activo == activo)
if buscar:
query = query.where(Producto.nombre.ilike(f"%{buscar}%"))
# Total
count_query = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_query)).scalar() or 0
# Paginación
query = query.offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
items = result.scalars().all()
return PaginatedResponse(
items=items, total=total, page=page,
per_page=per_page, pages=-(-total // per_page),
)
@router.get("/{sku}", response_model=ProductoResponse)
async def obtener_producto(
sku: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
):
"""Obtener un producto por SKU."""
result = await db.execute(select(Producto).where(Producto.sku == sku))
producto = result.scalar_one_or_none()
if not producto:
raise HTTPException(status_code=404, detail=f"Producto {sku} no encontrado")
return producto
@router.post("", response_model=ProductoResponse, status_code=201)
async def crear_producto(
data: ProductoCreate,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
):
"""Crear un nuevo producto."""
# Verificar SKU único
existing = await db.execute(select(Producto).where(Producto.sku == data.sku))
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail=f"SKU {data.sku} ya existe")
producto = Producto(**data.model_dump())
db.add(producto)
await db.flush()
await db.refresh(producto)
return producto
@router.patch("/{sku}", response_model=ProductoResponse)
async def actualizar_producto(
sku: str,
data: ProductoUpdate,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
):
"""Actualizar campos de un producto existente."""
result = await db.execute(select(Producto).where(Producto.sku == sku))
producto = result.scalar_one_or_none()
if not producto:
raise HTTPException(status_code=404, detail=f"Producto {sku} no encontrado")
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(producto, field, value)
await db.flush()
await db.refresh(producto)
return producto
@router.delete("/{sku}", status_code=204)
async def eliminar_producto(
sku: str,
db: AsyncSession = Depends(get_db),
_: dict = Depends(verify_token),
):
"""Eliminar un producto por SKU."""
result = await db.execute(select(Producto).where(Producto.sku == sku))
producto = result.scalar_one_or_none()
if not producto:
raise HTTPException(status_code=404, detail=f"Producto {sku} no encontrado")
await db.delete(producto)
Paso 7: Punto de entrada principal
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.auth import create_access_token
from app.routers import productos
@asynccontextmanager
async def lifespan(app: FastAPI):
# Crear tablas al arrancar (en producción usa Alembic para migraciones)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
app = FastAPI(
title=settings.app_name,
description="API de integración para sincronización de productos entre sistemas",
version="1.0.0",
lifespan=lifespan,
)
# CORS — ajustar en producción
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Routers
app.include_router(productos.router)
@app.get("/health", tags=["Sistema"])
async def health_check():
"""Verificar que la API está funcionando."""
return {"status": "ok", "app": settings.app_name}
@app.post("/auth/token", tags=["Autenticación"])
async def login(api_key: str):
"""
Generar un token JWT.
En producción, valida contra una base de datos de usuarios/API keys.
"""
# Ejemplo simplificado — en producción valida contra DB
if api_key != "mi-api-key-secreta":
from fastapi import HTTPException
raise HTTPException(status_code=401, detail="API key inválida")
token = create_access_token({"sub": "sistema-ecommerce", "role": "integration"})
return {"access_token": token, "token_type": "bearer"}
Paso 8: Ejecutar y probar
# Crear la base de datos
sudo -u postgres psql -c "CREATE DATABASE miapi OWNER app;"
# Arrancar la API
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
Abre http://localhost:8000/docs — verás la documentación interactiva de Swagger generada automáticamente con todos tus endpoints, schemas y la opción de probar cada uno desde el navegador.
Probar con curl
# 1. Obtener token
TOKEN=$(curl -s -X POST "http://localhost:8000/auth/token?api_key=mi-api-key-secreta" | python3 -c "import sys,json; print(json.load(sys.stdin)['access_token'])")
# 2. Crear producto
curl -X POST http://localhost:8000/api/v1/productos \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"sku":"LAPTOP-001","nombre":"Laptop Empresarial","precio":18999.00,"stock":50}'
# 3. Listar productos
curl -H "Authorization: Bearer $TOKEN" "http://localhost:8000/api/v1/productos?page=1&per_page=10"
# 4. Actualizar stock
curl -X PATCH http://localhost:8000/api/v1/productos/LAPTOP-001 \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"stock":45}'
Paso 9: Containerizar con Docker
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Con Docker Compose:
# docker-compose.yml
services:
api:
build: .
restart: unless-stopped
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql+asyncpg://app:${DB_PASSWORD}@db:5432/miapi
SECRET_KEY: ${SECRET_KEY}
depends_on:
db:
condition: service_healthy
db:
image: postgres:16-alpine
restart: unless-stopped
environment:
POSTGRES_DB: miapi
POSTGRES_USER: app
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U app -d miapi"]
interval: 10s
timeout: 5s
retries: 5
volumes:
pg_data:
Siguientes pasos
Con la API funcionando, puedes extenderla:
- Alembic — migraciones de base de datos versionadas (indispensable para producción)
- Rate limiting — proteger la API con Redis y
slowapi - Webhooks — notificar a otros sistemas cuando un producto cambia (patrón pub/sub)
- Tests —
pytest+httpxpara tests de integración de cada endpoint - Nginx reverse proxy — exponer la API con HTTPS en producción
- Desarrollo a medida — si necesitas una integración más compleja, te ayudamos
Integraciones a medida
¿Necesitas conectar tu ERP con otros sistemas?
Desarrollamos APIs e integraciones que sincronizan datos entre tu ERP, CRM, e-commerce y cualquier plataforma en tiempo real.



