Skip to content

HTTP binding

Using the HTTP binding handler is straightforward. Just remember to reuse the same HTTPHandler multiple times.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

http_handler = HTTPHandler(OrderCreated)

def do_something():
    http_handler.from_json("json_body")
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

def do_something():
    http_handler = HTTPHandler(OrderCreated)
    http_handler.from_json("json_body")

Why you have to reuse the same object?

When the HTTPHandler instance is created it creates internally instances of Pydantic TypeAdapter for the event class, to handle efficiently event serialization and discriminated unions. This is an expensive operation. Check the Pydantic documentation about this.

Deserialize an event

HTTP deserialization parses the body to reconstruct the event.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

single_event_json = '{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}'
batch_event_json = '[{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}]'

http_handler = HTTPHandler(OrderCreated)

# Single JSON event
event = http_handler.from_json(single_event_json)
# JSON batch (list) of events
batch_of_events = http_handler.from_json_batch(batch_event_json)
# HTTP Binary event (using the raw request headers and body)
event = http_handler.from_binary(headers={}, body="body")
from cloudevents_pydantic.bindings.http import HTTPHandler

single_event_json = '{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}'
batch_event_json = '[{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}]'

http_handler = HTTPHandler()

# Single JSON event
event = http_handler.from_json(single_event_json)
# JSON batch (list) of events
batch_of_events = http_handler.from_json_batch(batch_event_json)
# HTTP Binary event (using the raw request headers and body)
event = http_handler.from_binary(headers={}, body="body")
Use discriminated Unions to handle multiple Event classes

You'll want to use discriminated unions as event class and use a single HTTPHandler for multiple Event classes to be more efficient on validation and to produce a correct schema.

from typing import Annotated, Literal, Union

from pydantic import Field
from typing_extensions import TypedDict

from cloudevents_pydantic.bindings.http import HTTPHandler
from cloudevents_pydantic.events import CloudEvent


class OrderCreatedEvent(CloudEvent):
    data: TypedDict("OrderCreatedData", {"order_id": str})
    type: Literal["order_created"]


class CustomerCreatedEvent(CloudEvent):
    data: TypedDict("CustomerCreatedData", {"customer_id": str})
    type: Literal["customer_created"]


Event = Annotated[
    Union[OrderCreatedEvent, CustomerCreatedEvent],
    Field(discriminator="type"),
]

http_handler = HTTPHandler(Event)

customer_event_json = '{"data":{"customer_id":"123"},"source":"customer_service","id":"123","type":"customer_created","specversion":"1.0","time":null,"subject":null,"datacontenttype":null,"dataschema":null}'

print(type(http_handler.from_json(customer_event_json)))
# <class '__main__.CustomerCreatedEvent'>

FastAPI

Both this package and FastAPI are built on top of Pydantic. This means you don't need to instantiate a HTTPHandler to receive CloudEvents using a FastAPI endpoint.

### Event classes omitted ###

Event = Annotated[
    Union[OrderCreatedEvent, CustomerCreatedEvent],
    Field(discriminator="type"),
]

# Endpoint for single events
@router.post("/event", status_code=204)
async def submit_event(
    event: Annotated[Event, Body()],
    content_type: Annotated[
        Literal["application/cloudevents+json; charset=UTF-8"], Header()
    ],
) -> None:
    do_something(event)

# Endpoint for event batches
@router.post("/batch", status_code=204)
async def submit_event_batch(
    event_batch: Annotated[List[Event], Body()],
    content_type: Annotated[
        Literal["application/cloudevents-batch+json; charset=UTF-8"], Header()
    ],
) -> None:
    for event in event_batch:
        do_something(event)

Generate the OpenAPI schema correctly

In order to have the OpenAPI spec correctly generated by FastAPI you'll need to work around some FastAPI limitations and manually specify some of the needed data. You can find a detailed example here.

Serialize an event

HTTP serialization returns header and body to be used in a HTTP request.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

minimal_attributes = {
    "type": "order_created",
    "source": "https://example.com/event-producer",
    "id": "b96267e2-87be-4f7a-b87c-82f64360d954",
    "specversion": "1.0",
}

http_handler = HTTPHandler(OrderCreated)
event = OrderCreated.event_factory(**minimal_attributes)

# Single event
headers, body = http_handler.to_json(event)
# Batch (list) of events
headers, body = http_handler.to_json_batch([event])
# HTTP Binary event
headers, body = http_handler.to_json(event)
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

minimal_attributes = {
    "type": "order_created",
    "source": "https://example.com/event-producer",
    "id": "b96267e2-87be-4f7a-b87c-82f64360d954",
    "specversion": "1.0",
}

http_handler = HTTPHandler()
event = CloudEvent.event_factory(**minimal_attributes)

# Single event
headers, body = http_handler.to_json(event)
# Batch (list) of events
headers, body = http_handler.to_json_batch([event])
# HTTP Binary event
headers, body = http_handler.to_json(event)