Skip to content

HubSpot Contacts CRM Connector

This is a compact connector server that receives NSPS enriched events and syncs relevant account/SIM data to a HubSpot Contacts CRM. Built with Python and FastAPI.

Key Features

  • FastAPI HTTP /health and /process-event endpoints secured with Bearer API_TOKEN.
  • Pydantic models and validation for SIM/Created and SIM/Updated events.
  • Async HubSpot client with retries (5XX – exponential backoff) and simple in-memory rate limiter (10 RPS).
  • Modular layout suitable for container deployment.

You can find the full example repository here: Hubspot CRM Connector

Technologies used

  • Python 3.10
  • FastAPI for the web framework
  • Uvicorn as ASGI server
  • Pydantic for data validation and settings management
  • httpx as async HTTP client for making external API requests
  • python-dotenv for loading environment variables from a .env file
  • pytest for writing and running tests

Dependencies (from requirements.txt):

fastapi==0.103.2
uvicorn[standard]==0.22.0
pydantic==2.5.2
pydantic-settings
httpx==0.24.1
python-dotenv==1.0.0
pytest==7.4.0
pytest-asyncio==0.22.0

Project layout

hubspot-crm-connector/
├── src/
│   ├── __init__.py
│   ├── config.py
│   ├── hubspot.py
│   ├── main.py
│   ├── models.py
│   └── utils.py
├── tests/
│   ├── conftest.py
│   ├── test_invalid_values.py
│   ├── test_main.py
│   └── test_models.py
├── .env.example
└── requirements.txt

Code explanation

1. Environment Configuration

This snippet defines and loads application settings from environment variables using Pydantic Settings.

src/config.py
class Settings(BaseSettings):
    ACCESS_TOKEN: str = Field(...)
    API_TOKEN: str = Field(...)
    EXTERNAL_API_BASE_URL: str = Field("https://api.hubapi.com/crm/v3/objects/contacts")
    LOG_LEVEL: str = Field("INFO")
    HTTP_TIMEOUT_SECONDS: int = Field(10)

    class Config:
        env_file = ".env"

Then settings instance is created, making all configuration values available throughout the application.

src/config.py
settings = Settings()

2. Data Models and Validation

These models define the structure and validation logic for event data received from NSPS before sending it to HubSpot.

SIM card information model: validates SIM-related fields, ensuring i_sim_card is an integer and imsi consists of exactly 15 digits.

src/models.py
class SimInfo(BaseModel):
    i_sim_card: Optional[int] = None
    imsi: Optional[str] = None

    @field_validator("i_sim_card")
    @classmethod
    def i_sim_card_int(cls, v):
        if v is None:
            return v
        if not isinstance(v, int):
            raise ValueError("pb_data.sim_info.i_sim_card must be integer")
        return v

    @field_validator("imsi")
    @classmethod
    def imsi_valid(cls, v):
        if v is None:
            return v
        if not re.match(r"^[0-9]{15}$", v):
            raise ValueError("pb_data.sim_info.imsi must be 15 digits")
        return v

Account information model: represents customer account data and validates key fields like i_account, email, and phone1.

src/models.py
class AccountInfo(BaseModel):
    i_account: Optional[int] = None
    email: Optional[str] = None
    phone1: Optional[str] = None
    firstname: Optional[str] = None
    lastname: Optional[str] = None
    assigned_addons: Optional[List[Any]] = None
    billing_model: Optional[str] = None

    @field_validator("i_account")
    @classmethod
    def i_account_int(cls, v):
        if v is None:
            return v
        if not isinstance(v, int):
            raise ValueError("pb_data.account_info.i_account must be integer")
        return v

    @field_validator("email")
    @classmethod
    def email_valid(cls, v):
        if v is None:
            return v
        if not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", v):
            raise ValueError("pb_data.account_info.email must be a valid email")
        return v

    @field_validator("phone1")
    @classmethod
    def phone_valid(cls, v):
        if v is None:
            return v
        if not re.match(r"^[0-9]{10}$", v):
            raise ValueError("pb_data.account_info.phone1 must contain exactly 10 digits")
        return v

NSPS event model: represents NSPS event and implements mapping of validated data to HubSpot CRM contact properties, enforcing required fields based on the event type and filtering out empty values before sending.

src/models.py
class NSPSEvent(BaseModel):
    event_id: str
    data: EventData
    pb_data: Dict[str, Any]

    def to_hubspot_properties(self) -> Dict[str, Any]:
        ai = self.pb_data.get("account_info", {}) or {}
        si = self.pb_data.get("sim_info", {}) or {}

        try:
            account = AccountInfo.model_validate(ai)
            sim = SimInfo.model_validate(si)
        except ValidationError as e:
            # re-raise as ValueError to keep upstream handling simple
            raise ValueError(str(e))

        # required fields for SIM/Created
        if self.data.event_type == "SIM/Created":
            required = ["i_account", "email", "phone1", "firstname", "lastname", "assigned_addons", "i_sim_card", "imsi", "billing_model"]
            for f in required:
                if f in ("i_sim_card", "imsi"):
                    if getattr(sim, f if f != "i_sim_card" else "i_sim_card") is None:
                        raise ValueError(f"{f} is required for SIM/Created")
                else:
                    if ai.get(f) is None:
                        raise ValueError(f"{f} is required for SIM/Created")

        # for SIM/Updated only email is required
        if self.data.event_type == "SIM/Updated":
            if ai.get("email") is None:
                raise ValueError("pb_data.account_info.email is required for SIM/Updated")

        has_add_ons = bool(ai.get("assigned_addons"))

        props = {
            "email": account.email,
            "firstname": account.firstname,
            "lastname": account.lastname,
            "phone": account.phone1,
            "account_id": account.i_account,
            "has_add_ons": has_add_ons,
            "sim_card_id": sim.i_sim_card,
            "imsi": sim.imsi,
            "type": (account.billing_model or "").upper().replace("_", " ") if account.billing_model else None,
        }

        # remove None values to avoid sending empty properties
        return {k: v for k, v in props.items() if v is not None}

3. HubSpot Client and Error Handling

This section defines a lightweight asynchronous client for interacting with the HubSpot API, including error handling, retries, and simple rate limiting.

Custom exception for client errors: wraps client and network errors, optionally including the HTTP status code.

src/hubspot.py
class ESClientError(Exception):
    def __init__(self, message: str, status_code: Optional[int] = None):
        super().__init__(message)
        self.status_code = status_code

Asynchronous HubSpot API client: initializes an async HTTP client with authentication, configurable timeout, and an in-memory rate limiter (max 10 requests per second).

src/hubspot.py
class HubSpotClient:
    def __init__(self, base_url: str, token: str, timeout: int = 10):
        self.base_url = base_url.rstrip("/")
        self.token = token
        self.timeout = timeout
        self._client = httpx.AsyncClient(timeout=httpx.Timeout(timeout))

        # simple in-memory rate limiting
        self._lock = asyncio.Lock()
        self._last_window = int(time.time())
        self._window_count = 0
        self._max_per_sec = 10

Rate-limiting and retry logic: a basic rate-limiting mechanism to prevent exceeding HubSpot’s request limits.

src/hubspot.py
    async def _acquire_slot(self):
        async with self._lock:
            now = int(time.time())
            if now != self._last_window:
                self._last_window = now
                self._window_count = 0
            if self._window_count >= self._max_per_sec:
                # sleep till next second
                await asyncio.sleep(0.1)
                return await self._acquire_slot()
            self._window_count += 1

Resilient request handling: sends HTTP requests with retry and exponential backoff for transient errors.

src/hubspot.py
    async def _request_with_retries(self, method: str, url: str, **kwargs) -> httpx.Response:
        await self._acquire_slot()
        attempts = 0
        backoff = 0.5
        while True:
            try:
                headers = kwargs.pop("headers", {})
                headers.update({
                    "Authorization": f"Bearer {self.token}",
                    "Content-Type": "application/json",
                })
                resp = await self._client.request(method, url, headers=headers, **kwargs)
                if 200 <= resp.status_code < 300:
                    return resp
                if 400 <= resp.status_code < 500:
                    # client error —> alert (include status)
                    raise ESClientError(f"Client error: {resp.status_code} {resp.text}", status_code=resp.status_code)
                # 5xx -> retry
                attempts += 1
                if attempts > 3:
                    raise ESClientError(f"ES request failed after retries: {url}")
                await asyncio.sleep(backoff)
                backoff *= 2
                continue
            except httpx.RequestError as e:
                # network error -> retry once
                if attempts == 0:
                    attempts += 1
                    await asyncio.sleep(0.5)
                    continue
                raise ESClientError("Network error while connecting to ES")

4. HubSpot Contact Management Methods

These methods define how the connector server interacts with HubSpot’s CRM API to create or update contact records.

Creating a new contact: sends a POST request to create a new contact in HubSpot using the provided contact properties, automatically retries on transient errors using the client’s built-in retry logic.

src/hubspot.py
    async def create_contact(self, properties: Dict[str, Any]) -> httpx.Response:
        payload = {"properties": properties}
        return await self._request_with_retries("POST", self.base_url, json=payload)

Updating an existing contact: validates that the email is provided and sends a PATCH request with updated properties, retrying automatically if needed.

src/hubspot.py
    async def update_contact_by_email(self, email: str, properties: Dict[str, Any]) -> httpx.Response:
        if not email:
            raise ESClientError("email is required to update contact")
        # HubSpot: use ?idProperty=email to locate by email
        url = f"{self.base_url}/{email}?idProperty=email"
        payload = {"properties": properties}
        return await self._request_with_retries("PATCH", url, json=payload)

5. Connector server setup

This part configures the FastAPI application that acts as a connector between NSPS and HubSpot, handling authentication, event validation, and API communication.

Application initialization: initializes the FastAPI app, loads environment variables, configures bearer authentication, and creates a HubSpot client instance using credentials from settings.

src/main.py
load_dotenv()

app = FastAPI()
_start_time = time.time()
bearer = HTTPBearer()

client = HubSpotClient(base_url=settings.EXTERNAL_API_BASE_URL, token=settings.ACCESS_TOKEN)

Token verification: defines a dependency that validates incoming API requests using a Bearer token, rejects unauthorized requests with an informative 401 response.

src/main.py
def verify_bearer_token(credentials: HTTPAuthorizationCredentials = Depends(bearer)):
    token = credentials.credentials
    if token != settings.API_TOKEN:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail={
            "message": "Authentication failed",
            "error": "Invalid API token",
            "type": "AUTHENTICATION_ERROR",
        })

Event processing endpoint: defines the /process-event endpoint that:

  • validates and parses incoming event data;
  • checks event type and required fields;
  • creates or updates a HubSpot contact accordingly;
  • handles validation, authentication, and service errors with clear structured responses.
src/main.py
@app.post("/process-event")
async def process_event(request: Request, credentials: HTTPAuthorizationCredentials = Depends(verify_bearer_token)):
    ctx = extract_request_context(request)
    body = await request.json()
    try:
        event = NSPSEvent.model_validate(body)
    except Exception as e:
        log_with_context(ctx, "Validation failed", {"error": str(e)})
        return JSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={
            "message": "Validation failed",
            "error": str(e),
            "type": "VALIDATION_ERROR",
        })

    event_type = event.data.event_type
    log_with_context(ctx, "Received event", {"event_type": event_type, "event_id": event.event_id})

    if event_type not in ("SIM/Created", "SIM/Updated"):
        log_with_context(ctx, f"Unsupported event type: {event_type}")
        return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"Unsupported event type: {event_type}"})

    # map and validate required fields for supported event types
    try:
        props = event.to_hubspot_properties()
    except ValueError as e:
        log_with_context(ctx, "Validation failed for field", {"error": str(e)})
        return JSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={
            "message": "Validation failed",
            "error": str(e),
            "type": "VALIDATION_ERROR",
        })

    try:
        if event_type == "SIM/Created":
            resp = await client.create_contact(properties=props)
        else:
            resp = await client.update_contact_by_email(email=props.get("email"), properties=props)

        log_with_context(ctx, "ES request succeeded", {"status": resp.status_code})
        return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content={"message": "Event accepted for processing"})

    except client.ESClientError as e:
        # map specific status codes
        if getattr(e, "status_code", None) == 429:
            log_with_context(ctx, "ES rate limit", {"error": str(e)})
            return JSONResponse(status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={
                "message": "Too many requests to ES",
                "error": str(e),
                "type": "RATE_LIMIT_ERROR",
            })

        # client errors -> alert
        if getattr(e, "status_code", None) and 400 <= e.status_code < 500:
            log_with_context(ctx, "ES client error", {"error": str(e)})
            return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content={
                "message": f"Client error: {e.status_code}",
                "error": str(e),
                "type": "SERVICE_ERROR",
            })

        # otherwise treat as service error (5xx or network)
        log_with_context(ctx, "ES request failed", {"error": str(e)})
        return JSONResponse(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={
            "message": "ES request failed",
            "error": str(e),
            "type": "SERVICE_ERROR",
        })

6. Request Context and Logging Utilities

These helper functions provide structured, traceable logging for each request, enabling easier debugging and monitoring.

Extracting request context: generates or retrieves unique identifiers (trace_id, request_id) from request headers to track each request through the system.

src/utils.py
def extract_request_context(req: Request):
    trace_id = req.headers.get("x-b3-traceid") or str(uuid4())
    request_id = req.headers.get("x-request-id") or str(uuid4())
    return {"trace_id": trace_id, "request_id": request_id}

Structured logging: logs messages as JSON with timestamps and request context for consistent, machine-readable output — useful for tracing events across distributed services.

src/utils.py
def log_with_context(context: dict, message: str, data: dict = None):
    out = {
        "timestamp": __import__("datetime").datetime.utcnow().isoformat() + "Z",
        "trace_id": context.get("trace_id"),
        "request_id": context.get("request_id"),
        "message": message,
    }
    if data:
        out["data"] = data
    print(json.dumps(out))