HTTP binding
Using the HTTP binding handler is straightforward. Just remember to reuse the same
HTTPHandler
multiple times.
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler
class OrderCreated(CloudEvent):
...
http_handler = HTTPHandler(OrderCreated)
def do_something():
http_handler.from_json("json_body")
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler
class OrderCreated(CloudEvent):
...
def do_something():
http_handler = HTTPHandler(OrderCreated)
http_handler.from_json("json_body")
Why you have to reuse the same object?
When the HTTPHandler instance is created it creates internally instances of Pydantic TypeAdapter
for the event class, to handle efficiently event serialization and discriminated unions. This is
an expensive operation. Check the Pydantic documentation about this.
Deserialize an event
HTTP deserialization parses the body to reconstruct the event.
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler
class OrderCreated(CloudEvent):
...
single_event_json = '{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}'
batch_event_json = '[{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}]'
http_handler = HTTPHandler(OrderCreated)
# Single JSON event
event = http_handler.from_json(single_event_json)
# JSON batch (list) of events
batch_of_events = http_handler.from_json_batch(batch_event_json)
# HTTP Binary event (using the raw request headers and body)
event = http_handler.from_binary(headers={}, body="body")
from cloudevents_pydantic.bindings.http import HTTPHandler
single_event_json = '{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}'
batch_event_json = '[{"data":null,"source":"https://example.com/event-producer","id":"b96267e2-87be-4f7a-b87c-82f64360d954","type":"com.example.string","specversion":"1.0","time":"2022-07-16T12:03:20.519216+04:00","subject":null,"datacontenttype":null,"dataschema":null}]'
http_handler = HTTPHandler()
# Single JSON event
event = http_handler.from_json(single_event_json)
# JSON batch (list) of events
batch_of_events = http_handler.from_json_batch(batch_event_json)
# HTTP Binary event (using the raw request headers and body)
event = http_handler.from_binary(headers={}, body="body")
Use discriminated Unions to handle multiple Event classes
You'll want to use discriminated unions
as event class and use a single HTTPHandler
for multiple Event classes to be more efficient on validation
and to produce a correct schema.
from typing import Annotated, Literal, Union
from pydantic import Field
from typing_extensions import TypedDict
from cloudevents_pydantic.bindings.http import HTTPHandler
from cloudevents_pydantic.events import CloudEvent
class OrderCreatedEvent(CloudEvent):
data: TypedDict("OrderCreatedData", {"order_id": str})
type: Literal["order_created"]
class CustomerCreatedEvent(CloudEvent):
data: TypedDict("CustomerCreatedData", {"customer_id": str})
type: Literal["customer_created"]
Event = Annotated[
Union[OrderCreatedEvent, CustomerCreatedEvent],
Field(discriminator="type"),
]
http_handler = HTTPHandler(Event)
customer_event_json = '{"data":{"customer_id":"123"},"source":"customer_service","id":"123","type":"customer_created","specversion":"1.0","time":null,"subject":null,"datacontenttype":null,"dataschema":null}'
print(type(http_handler.from_json(customer_event_json)))
# <class '__main__.CustomerCreatedEvent'>
FastAPI
Both this package and FastAPI are built on top
of Pydantic. This means you don't need to instantiate
a HTTPHandler
to receive CloudEvents using a FastAPI endpoint.
### Event classes omitted ###
Event = Annotated[
Union[OrderCreatedEvent, CustomerCreatedEvent],
Field(discriminator="type"),
]
# Endpoint for single events
@router.post("/event", status_code=204)
async def submit_event(
event: Annotated[Event, Body()],
content_type: Annotated[
Literal["application/cloudevents+json; charset=UTF-8"], Header()
],
) -> None:
do_something(event)
# Endpoint for event batches
@router.post("/batch", status_code=204)
async def submit_event_batch(
event_batch: Annotated[List[Event], Body()],
content_type: Annotated[
Literal["application/cloudevents-batch+json; charset=UTF-8"], Header()
],
) -> None:
for event in event_batch:
do_something(event)
Generate the OpenAPI schema correctly
In order to have the OpenAPI spec correctly generated by FastAPI you'll need to work around some FastAPI limitations and manually specify some of the needed data. You can find a detailed example here.
Serialize an event
HTTP serialization returns header and body to be used in a HTTP request.
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler
class OrderCreated(CloudEvent):
...
minimal_attributes = {
"type": "order_created",
"source": "https://example.com/event-producer",
"id": "b96267e2-87be-4f7a-b87c-82f64360d954",
"specversion": "1.0",
}
http_handler = HTTPHandler(OrderCreated)
event = OrderCreated.event_factory(**minimal_attributes)
# Single event
headers, body = http_handler.to_json(event)
# Batch (list) of events
headers, body = http_handler.to_json_batch([event])
# HTTP Binary event
headers, body = http_handler.to_json(event)
from cloudevents_pydantic.events import CloudEvent
from cloudevents_pydantic.bindings.http import HTTPHandler
minimal_attributes = {
"type": "order_created",
"source": "https://example.com/event-producer",
"id": "b96267e2-87be-4f7a-b87c-82f64360d954",
"specversion": "1.0",
}
http_handler = HTTPHandler()
event = CloudEvent.event_factory(**minimal_attributes)
# Single event
headers, body = http_handler.to_json(event)
# Batch (list) of events
headers, body = http_handler.to_json_batch([event])
# HTTP Binary event
headers, body = http_handler.to_json(event)