Skip to content

HTTP binding

Using the HTTP binding handler is straightforward. Just remember to reuse the same HTTPHandler multiple times.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

http_handler = HTTPHandler(OrderCreated)

def do_something():
    http_handler.from_json("json_body")
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

def do_something():
    http_handler = HTTPHandler(OrderCreated)
    http_handler.from_json("json_body")

Why you have to reuse the same object?

When the HTTPHandler instance is created it creates internally instances of Pydantic TypeAdapter for the event class, to handle efficiently event serialization and discriminated unions. This is an expensive operation. Check the Pydantic documentation about this.

Deserialize a JSON event

HTTP deserialization parses the body to reconstruct the event.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

single_event_json = '{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}'
batch_event_json = '[{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}]'

http_handler = HTTPHandler(OrderCreated)

# Single event
event = http_handler.from_json(single_event_json)
# Batch (list) of events
batch_of_events = http_handler.from_json_batch(batch_event_json)
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

minimal_attributes = {
    "type": "order_created",
    "source": "https://example.com/event-producer",
    "id": "b96267e2-87be-4f7a-b87c-82f64360d954",
    "specversion": "1.0",
}

http_handler = HTTPHandler()
event = CloudEvent.event_factory(**minimal_attributes)

# Single event
event = http_handler.to_json(event)
# Batch (list) of events
batch_of_events = http_handler.to_json_batch([event])
Use discriminated Unions to handle multiple Event classes

You'll want to use discriminated unions as event class and use a single HTTPHandler for multiple Event classes to be more efficient on validation and to produce a correct schema.

from typing import Annotated, Literal, Union

from pydantic import Field
from typing_extensions import TypedDict

from cloudevents_pydantic.bindings.http import HTTPHandler
from cloudevents_pydantic.events import CloudEvent


class OrderCreatedEvent(CloudEvent):
    data: TypedDict("OrderCreatedData", {"order_id": str})
    type: Literal["order_created"]


class CustomerCreatedEvent(CloudEvent):
    data: TypedDict("CustomerCreatedData", {"customer_id": str})
    type: Literal["customer_created"]


Event = Annotated[
    Union[OrderCreatedEvent, CustomerCreatedEvent],
    Field(discriminator="type"),
]

http_handler = HTTPHandler(Event)

customer_event_json = '{"data":{"customer_id":"123"},"source":"customer_service","id":"123","type":"customer_created","specversion":"1.0","time":null,"subject":null,"datacontenttype":null,"dataschema":null}'

print(type(http_handler.from_json(customer_event_json)))
# <class '__main__.CustomerCreatedEvent'>

FastAPI

Both this package and FastAPI are built on top of Pydantic. This means you don't need to instantiate a HTTPHandler to receive CloudEvents using a FastAPI endpoint.

### Event classes omitted ###

Event = Annotated[
    Union[OrderCreatedEvent, CustomerCreatedEvent],
    Field(discriminator="type"),
]

# Endpoint for single events
@router.post("/event", status_code=204)
async def submit_event(
    event: Annotated[Event, Body()],
    content_type: Annotated[
        Literal["application/cloudevents+json; charset=UTF-8"], Header()
    ],
) -> None:
    do_something(event)

# Endpoint for event batches
@router.post("/batch", status_code=204)
async def submit_event_batch(
    event_batch: Annotated[List[Event], Body()],
    content_type: Annotated[
        Literal["application/cloudevents-batch+json; charset=UTF-8"], Header()
    ],
) -> None:
    for event in event_batch:
        do_something(event)

Generate the OpenAPI schema correctly

In order to have the OpenAPI spec correctly generated by FastAPI you'll need to work around some FastAPI limitations and manually specify some of the needed data. You can find a detailed example here.

Serialize a JSON event

HTTP serialization returns header and body to be used in a HTTP request.

from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

class OrderCreated(CloudEvent):
    ...

minimal_attributes = {
    "type": "order_created",
    "source": "https://example.com/event-producer",
    "id": "b96267e2-87be-4f7a-b87c-82f64360d954",
    "specversion": "1.0",
}

http_handler = HTTPHandler(OrderCreated)
event = OrderCreated.event_factory(**minimal_attributes)

# Single event
headers, body = http_handler.to_json(event)
# Batch (list) of events
headers, body = http_handler.to_json_batch([event])
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler

minimal_attributes = {
    "type": "order_created",
    "source": "https://example.com/event-producer",
    "id": "b96267e2-87be-4f7a-b87c-82f64360d954",
    "specversion": "1.0",
}

http_handler = HTTPHandler()
event = CloudEvent.event_factory(**minimal_attributes)

# Single event
json_string = http_handler.to_json(event)
# Batch (list) of events
json_batch_string = http_handler.to_json_batch([event])