Skip to content

Repository

sqlalchemy_bind_manager.repository

sqlalchemy_bind_manager.repository.SQLAlchemyRepository

Source code in sqlalchemy_bind_manager/_repository/sync.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class SQLAlchemyRepository(
    Generic[MODEL],
    BaseRepository[MODEL],
):
    _session_handler: SessionHandler
    _external_session: Union[Session, None]

    def __init__(
        self,
        bind: Union[SQLAlchemyBind, None] = None,
        session: Union[Session, None] = None,
        model_class: Union[Type[MODEL], None] = None,
    ) -> None:
        super().__init__(model_class=model_class)
        if not (bool(bind) ^ bool(session)):
            raise InvalidConfigError(
                "Either `bind` or `session` have to be used, not both"
            )
        self._external_session = session
        if bind:
            self._session_handler = SessionHandler(bind)

    def get(self, identifier: PRIMARY_KEY) -> MODEL:
        """Get a model by primary key.

        :param identifier: The primary key
        :return: A model instance
        :raises ModelNotFoundError: No model has been found using the primary key
        """
        with self._get_session(commit=False) as session:
            model = session.get(self._model, identifier)
        if model is None:
            raise ModelNotFoundError("No rows found for provided primary key.")
        return model

    def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
        """Get a list of models by primary keys.

        :param identifiers: A list of primary keys
        :return: A list of models
        """
        stmt = select(self._model).where(
            getattr(self._model, self._model_pk()).in_(identifiers)
        )

        with self._get_session(commit=False) as session:
            return [x for x in session.execute(stmt).scalars()]

    def save(self, instance: MODEL) -> MODEL:
        """Persist a model.

        :param instance: A mapped object instance to be persisted
        :return: The model instance after being persisted
        """
        self._fail_if_invalid_models([instance])
        with self._get_session() as session:
            session.add(instance)
        return instance

    def save_many(self, instances: Iterable[MODEL]) -> Iterable[MODEL]:
        """Persist many models in a single database get_session.

        :param instances: A list of mapped objects to be persisted
        :return: The model instances after being persisted
        """
        self._fail_if_invalid_models(instances)
        with self._get_session() as session:
            session.add_all(instances)
        return instances

    def delete(self, instance: MODEL) -> None:
        """Deletes a model.

        :param instance: The model instance
        """
        self._fail_if_invalid_models([instance])
        with self._get_session() as session:
            session.delete(instance)

    def delete_many(self, instances: Iterable[MODEL]) -> None:
        """Deletes a collection of models in a single transaction.

        :param instances: The model instances
        """
        self._fail_if_invalid_models(instances)
        with self._get_session() as session:
            for model in instances:
                session.delete(model)

    def find(
        self,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> List[MODEL]:
        """Find models using filters.

        E.g.

            # find all models with name = John
            find(search_params={"name":"John"})

            # find all models ordered by `name` column
            find(order_by=["name"])

            # find all models with reversed order by `name` column
            find(order_by=[("name", "desc")])

        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        stmt = self._find_query(search_params, order_by)

        with self._get_session() as session:
            result = session.execute(stmt)
            return [x for x in result.scalars()]

    def paginated_find(
        self,
        items_per_page: int,
        page: int = 1,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> PaginatedResult[MODEL]:
        """Find models using filters and limit/offset pagination. Returned results
        do include pagination metadata.

        E.g.

            # find all models with name = John
            paginated_find(search_params={"name":"John"})

            # find first 50 models with name = John
            paginated_find(50, search_params={"name":"John"})

            # find 50 models with name = John, skipping 2 pages (100)
            paginated_find(50, 3, search_params={"name":"John"})

            # find all models ordered by `name` column
            paginated_find(order_by=["name"])

            # find all models with reversed order by `name` column
            paginated_find(order_by=[("name", "desc")])

        :param items_per_page: Number of models to retrieve
        :param page: Page to retrieve
        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params, order_by)
        paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

        with self._get_session() as session:
            total_items_count = (
                session.execute(self._count_query(find_stmt)).scalar() or 0
            )
            result_items = [x for x in session.execute(paginated_stmt).scalars()]

            return PaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                page=page,
                items_per_page=self._sanitised_query_limit(items_per_page),
            )

    def cursor_paginated_find(
        self,
        items_per_page: int,
        cursor_reference: Union[CursorReference, None] = None,
        is_before_cursor: bool = False,
        search_params: Union[None, Mapping[str, Any]] = None,
    ) -> CursorPaginatedResult[MODEL]:
        """Find models using filters and cursor based pagination. Returned results
        do include pagination metadata.

        E.g.

            # finds all models with name = John
            cursor_paginated_find(search_params={"name":"John"})

            # finds first 50 models with name = John
            cursor_paginated_find(50, search_params={"name":"John"})

            # finds first 50 models after the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123))

            # finds last 50 models before the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123), True)

        :param items_per_page: Number of models to retrieve
        :param cursor_reference: A cursor reference containing ordering column
            and threshold value
        :param is_before_cursor: If True it will return items before the cursor,
            otherwise items after
        :param search_params: A mapping containing equality filters
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params)

        paginated_stmt = self._cursor_paginated_query(
            find_stmt,
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
            items_per_page=items_per_page,
        )

        with self._get_session() as session:
            total_items_count = (
                session.execute(self._count_query(find_stmt)).scalar() or 0
            )
            result_items = [x for x in session.execute(paginated_stmt).scalars()]

            return CursorPaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                items_per_page=self._sanitised_query_limit(items_per_page),
                cursor_reference=cursor_reference,
                is_before_cursor=is_before_cursor,
            )

    @contextmanager
    def _get_session(self, commit: bool = True) -> Iterator[Session]:
        if not self._external_session:
            with self._session_handler.get_session(not commit) as _session:
                yield _session
        else:
            yield self._external_session

_is_mapped_class

_is_mapped_class(class_: Type[MODEL]) -> bool

Checks if the class is mapped in SQLAlchemy.

Parameters:

Name Type Description Default
class_ Type[MODEL]

the model class

required

Returns:

Type Description
bool

True if the Type is mapped, False otherwise

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
66
67
68
69
70
71
72
73
74
75
76
77
def _is_mapped_class(self, class_: Type[MODEL]) -> bool:
    """Checks if the class is mapped in SQLAlchemy.

    :param class_: the model class
    :return: True if the Type is mapped, False otherwise
    :rtype: bool
    """
    try:
        class_mapper(class_)
        return True
    except UnmappedClassError:
        return False

_validate_mapped_property

_validate_mapped_property(property_name: str) -> None

Checks if a property is mapped in the model class.

Parameters:

Name Type Description Default
property_name str

The name of the property to be evaluated.

required

Raises:

Type Description
UnmappedPropertyError

When the property is not mapped.

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
79
80
81
82
83
84
85
86
87
88
89
90
91
def _validate_mapped_property(self, property_name: str) -> None:
    """Checks if a property is mapped in the model class.

    :param property_name: The name of the property to be evaluated.
    :type property_name: str
    :raises UnmappedPropertyError: When the property is not mapped.
    """
    m: Mapper = class_mapper(self._model)
    if property_name not in m.column_attrs:
        raise UnmappedPropertyError(
            f"Property `{property_name}` is not mapped"
            f" in the ORM for model `{self._model}`"
        )

_filter_select

_filter_select(
    stmt: Select, search_params: Mapping[str, Any]
) -> Select

Build the query filtering clauses from submitted parameters.

E.g. _filter_select(stmt, name="John") adds a WHERE name = John statement

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
search_params Mapping[str, Any]

Any keyword argument to be used as equality filter

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def _filter_select(self, stmt: Select, search_params: Mapping[str, Any]) -> Select:
    """Build the query filtering clauses from submitted parameters.

    E.g.
    _filter_select(stmt, name="John") adds a `WHERE name = John` statement

    :param stmt: a Select statement
    :type stmt: Select
    :param search_params: Any keyword argument to be used as equality filter
    :type search_params: Mapping[str, Any]
    :return: The filtered query
    """
    # TODO: Add support for relationship eager load
    for k, v in search_params.items():
        """
        This acts as a TypeGuard but using TypeGuard typing would break
        compatibility with python < 3.10, for the moment we prefer to ignore
        typing issues here
        """
        self._validate_mapped_property(k)
        stmt = stmt.where(getattr(self._model, k) == v)
    return stmt

_filter_order_by

_filter_order_by(
    stmt: Select,
    order_by: Iterable[
        Union[str, Tuple[str, Literal["asc", "desc"]]]
    ],
) -> Select

Build the query ordering clauses from submitted parameters.

E.g. _filter_order_by(stmt, ['name']) adds a ORDER BY name statement

_filter_order_by(stmt, [('name', 'asc')]) adds a ORDER BY name ASC statement

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
order_by Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]

a list of columns, or tuples (column, direction)

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def _filter_order_by(
    self,
    stmt: Select,
    order_by: Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
) -> Select:
    """Build the query ordering clauses from submitted parameters.

    E.g.
    `_filter_order_by(stmt, ['name'])`
        adds a `ORDER BY name` statement

    `_filter_order_by(stmt, [('name', 'asc')])`
        adds a `ORDER BY name ASC` statement

    :param stmt: a Select statement
    :param order_by: a list of columns, or tuples (column, direction)
    :return: The filtered query
    """
    _partial_registry: Dict[Literal["asc", "desc"], Callable] = {
        "desc": partial(desc),
        "asc": partial(asc),
    }

    for value in order_by:
        if isinstance(value, str):
            self._validate_mapped_property(value)
            stmt = stmt.order_by(getattr(self._model, value))
        else:
            self._validate_mapped_property(value[0])
            stmt = stmt.order_by(
                _partial_registry[value[1]](getattr(self._model, value[0]))
            )

    return stmt

_find_query

_find_query(
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> Select

Build a query with column filters and orders.

E.g. q = _find_query(search_params={"name":"John"}) finds all models with name = John

q = _find_query(order_by=["name"]) finds all models ordered by name column

q = _find_query(order_by=[("name", "desc")]) finds all models with reversed order by name column

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

Any keyword argument to be used as equality filter

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]

a list of columns, or tuples (column, direction)

None

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def _find_query(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> Select:
    """Build a query with column filters and orders.

    E.g.
    q = _find_query(search_params={"name":"John"})
        finds all models with name = John

    q = _find_query(order_by=["name"])
        finds all models ordered by `name` column

    q = _find_query(order_by=[("name", "desc")])
        finds all models with reversed order by `name` column

    :param search_params: Any keyword argument to be used as equality filter
    :param order_by: a list of columns, or tuples (column, direction)
    :return: The filtered query
    """
    stmt = select(self._model)

    if search_params:
        stmt = self._filter_select(stmt, search_params)
    if order_by is not None:
        stmt = self._filter_order_by(stmt, order_by)

    return stmt

_paginate_query_by_page

_paginate_query_by_page(
    stmt: Select, page: int, items_per_page: int
) -> Select

Build the query offset and limit clauses from submitted parameters.

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
page int

Number of models to skip

required
items_per_page int

Number of models to retrieve

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def _paginate_query_by_page(
    self,
    stmt: Select,
    page: int,
    items_per_page: int,
) -> Select:
    """Build the query offset and limit clauses from submitted parameters.

    :param stmt: a Select statement
    :type stmt: Select
    :param page: Number of models to skip
    :type page: int
    :param items_per_page: Number of models to retrieve
    :type items_per_page: int
    :return: The filtered query
    """

    _offset = max((page - 1) * items_per_page, 0)
    if _offset > 0:
        stmt = stmt.offset(_offset)

    _limit = self._sanitised_query_limit(items_per_page)
    stmt = stmt.limit(_limit)

    return stmt

_cursor_paginated_query

_cursor_paginated_query(
    stmt: Select,
    cursor_reference: Union[CursorReference, None],
    is_before_cursor: bool = False,
    items_per_page: int = _max_query_limit,
) -> Select

Adds the clauses to retrieve the requested slice of models, after or before the cursor value, plus a model before the slice and one after the slice, to identify if previous or next results are available.

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
items_per_page int

Number of models to retrieve

_max_query_limit

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def _cursor_paginated_query(
    self,
    stmt: Select,
    cursor_reference: Union[CursorReference, None],
    is_before_cursor: bool = False,
    items_per_page: int = _max_query_limit,
) -> Select:
    """Adds the clauses to retrieve the requested slice of models, after
    or before the cursor value, plus a model before the slice and one after
    the slice, to identify if previous or next results are available.

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :param items_per_page: Number of models to retrieve
    :type items_per_page: int
    :return: The filtered query
    """
    forward_limit = self._sanitised_query_limit(items_per_page) + 1

    if not cursor_reference:
        return stmt.limit(forward_limit).order_by(  # type: ignore
            asc(self._model_pk())
        )

    previous_query = self._cursor_pagination_previous_item_query(
        stmt, cursor_reference, is_before_cursor
    ).subquery("previous")

    page_query = self._cursor_pagination_slice_query(
        stmt, cursor_reference, forward_limit, is_before_cursor
    ).subquery("slice")

    query = select(
        aliased(
            self._model,
            select(previous_query)
            .union_all(select(page_query))
            .order_by(cursor_reference.column)
            .subquery("cursor_pagination"),  # type: ignore
        )
    )
    return query

_cursor_pagination_slice_query

_cursor_pagination_slice_query(
    stmt: Select,
    cursor_reference: CursorReference,
    limit: int,
    is_before_cursor: bool,
)

Adds the clauses to retrieve a requested slice of models, after or before the cursor value (excluding the cursor itself)

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference CursorReference

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

required
limit int

Number of models to retrieve

required

Returns:

Type Description

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def _cursor_pagination_slice_query(
    self,
    stmt: Select,
    cursor_reference: CursorReference,
    limit: int,
    is_before_cursor: bool,
):
    """Adds the clauses to retrieve a requested slice of models,
    after or before the cursor value (excluding the cursor itself)

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :param limit: Number of models to retrieve
    :type limit: int
    :return: The filtered query
    """
    if not is_before_cursor:
        page_query = stmt.where(
            getattr(self._model, cursor_reference.column) > cursor_reference.value
        )
        page_query = self._filter_order_by(
            page_query, [(cursor_reference.column, "asc")]
        )
    else:
        page_query = stmt.where(
            getattr(self._model, cursor_reference.column) < cursor_reference.value
        )
        page_query = self._filter_order_by(
            page_query, [(cursor_reference.column, "desc")]
        )
    return page_query.limit(limit)

_cursor_pagination_previous_item_query

_cursor_pagination_previous_item_query(
    stmt: Select,
    cursor_reference: CursorReference,
    is_before_cursor: bool,
) -> Select

Adds the clauses to retrieve a single model, after or before the cursor value (including the cursor itself).

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference CursorReference

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def _cursor_pagination_previous_item_query(
    self, stmt: Select, cursor_reference: CursorReference, is_before_cursor: bool
) -> Select:
    """Adds the clauses to retrieve a single model, after or before
    the cursor value (including the cursor itself).

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :return: The filtered query
    """
    if not is_before_cursor:
        previous_query = stmt.where(
            getattr(self._model, cursor_reference.column) <= cursor_reference.value
        )
        previous_query = self._filter_order_by(
            previous_query, [(cursor_reference.column, "desc")]
        )
    else:
        previous_query = stmt.where(
            getattr(self._model, cursor_reference.column) >= cursor_reference.value
        )
        previous_query = self._filter_order_by(
            previous_query, [(cursor_reference.column, "asc")]
        )

    return previous_query.limit(1)

_model_pk

_model_pk() -> str

Retrieves the primary key name from the repository model class.

Returns:

Type Description
str
Source code in sqlalchemy_bind_manager/_repository/base_repository.py
341
342
343
344
345
346
347
348
349
350
351
def _model_pk(self) -> str:
    """
    Retrieves the primary key name from the repository model class.

    :return:
    """
    primary_keys = inspect(self._model).primary_key  # type: ignore
    if len(primary_keys) > 1:
        raise NotImplementedError("Composite primary keys are not supported.")

    return primary_keys[0].name

get

get(identifier: PRIMARY_KEY) -> MODEL

Get a model by primary key.

Parameters:

Name Type Description Default
identifier PRIMARY_KEY

The primary key

required

Returns:

Type Description
MODEL

A model instance

Raises:

Type Description
ModelNotFoundError

No model has been found using the primary key

Source code in sqlalchemy_bind_manager/_repository/sync.py
76
77
78
79
80
81
82
83
84
85
86
87
def get(self, identifier: PRIMARY_KEY) -> MODEL:
    """Get a model by primary key.

    :param identifier: The primary key
    :return: A model instance
    :raises ModelNotFoundError: No model has been found using the primary key
    """
    with self._get_session(commit=False) as session:
        model = session.get(self._model, identifier)
    if model is None:
        raise ModelNotFoundError("No rows found for provided primary key.")
    return model

get_many

get_many(identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]

Get a list of models by primary keys.

Parameters:

Name Type Description Default
identifiers Iterable[PRIMARY_KEY]

A list of primary keys

required

Returns:

Type Description
List[MODEL]

A list of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
    """Get a list of models by primary keys.

    :param identifiers: A list of primary keys
    :return: A list of models
    """
    stmt = select(self._model).where(
        getattr(self._model, self._model_pk()).in_(identifiers)
    )

    with self._get_session(commit=False) as session:
        return [x for x in session.execute(stmt).scalars()]

save

save(instance: MODEL) -> MODEL

Persist a model.

Parameters:

Name Type Description Default
instance MODEL

A mapped object instance to be persisted

required

Returns:

Type Description
MODEL

The model instance after being persisted

Source code in sqlalchemy_bind_manager/_repository/sync.py
102
103
104
105
106
107
108
109
110
111
def save(self, instance: MODEL) -> MODEL:
    """Persist a model.

    :param instance: A mapped object instance to be persisted
    :return: The model instance after being persisted
    """
    self._fail_if_invalid_models([instance])
    with self._get_session() as session:
        session.add(instance)
    return instance

save_many

save_many(instances: Iterable[MODEL]) -> Iterable[MODEL]

Persist many models in a single database get_session.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

A list of mapped objects to be persisted

required

Returns:

Type Description
Iterable[MODEL]

The model instances after being persisted

Source code in sqlalchemy_bind_manager/_repository/sync.py
113
114
115
116
117
118
119
120
121
122
def save_many(self, instances: Iterable[MODEL]) -> Iterable[MODEL]:
    """Persist many models in a single database get_session.

    :param instances: A list of mapped objects to be persisted
    :return: The model instances after being persisted
    """
    self._fail_if_invalid_models(instances)
    with self._get_session() as session:
        session.add_all(instances)
    return instances

delete

delete(instance: MODEL) -> None

Deletes a model.

Parameters:

Name Type Description Default
instance MODEL

The model instance

required
Source code in sqlalchemy_bind_manager/_repository/sync.py
124
125
126
127
128
129
130
131
def delete(self, instance: MODEL) -> None:
    """Deletes a model.

    :param instance: The model instance
    """
    self._fail_if_invalid_models([instance])
    with self._get_session() as session:
        session.delete(instance)

delete_many

delete_many(instances: Iterable[MODEL]) -> None

Deletes a collection of models in a single transaction.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

The model instances

required
Source code in sqlalchemy_bind_manager/_repository/sync.py
133
134
135
136
137
138
139
140
141
def delete_many(self, instances: Iterable[MODEL]) -> None:
    """Deletes a collection of models in a single transaction.

    :param instances: The model instances
    """
    self._fail_if_invalid_models(instances)
    with self._get_session() as session:
        for model in instances:
            session.delete(model)

find

find(
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> List[MODEL]

Find models using filters.

E.g.

# find all models with name = John
find(search_params={"name":"John"})

# find all models ordered by `name` column
find(order_by=["name"])

# find all models with reversed order by `name` column
find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
List[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def find(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> List[MODEL]:
    """Find models using filters.

    E.g.

        # find all models with name = John
        find(search_params={"name":"John"})

        # find all models ordered by `name` column
        find(order_by=["name"])

        # find all models with reversed order by `name` column
        find(order_by=[("name", "desc")])

    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    stmt = self._find_query(search_params, order_by)

    with self._get_session() as session:
        result = session.execute(stmt)
        return [x for x in result.scalars()]

paginated_find

paginated_find(
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> PaginatedResult[MODEL]

Find models using filters and limit/offset pagination. Returned results do include pagination metadata.

E.g.

# find all models with name = John
paginated_find(search_params={"name":"John"})

# find first 50 models with name = John
paginated_find(50, search_params={"name":"John"})

# find 50 models with name = John, skipping 2 pages (100)
paginated_find(50, 3, search_params={"name":"John"})

# find all models ordered by `name` column
paginated_find(order_by=["name"])

# find all models with reversed order by `name` column
paginated_find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
page int

Page to retrieve

1
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
PaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def paginated_find(
    self,
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> PaginatedResult[MODEL]:
    """Find models using filters and limit/offset pagination. Returned results
    do include pagination metadata.

    E.g.

        # find all models with name = John
        paginated_find(search_params={"name":"John"})

        # find first 50 models with name = John
        paginated_find(50, search_params={"name":"John"})

        # find 50 models with name = John, skipping 2 pages (100)
        paginated_find(50, 3, search_params={"name":"John"})

        # find all models ordered by `name` column
        paginated_find(order_by=["name"])

        # find all models with reversed order by `name` column
        paginated_find(order_by=[("name", "desc")])

    :param items_per_page: Number of models to retrieve
    :param page: Page to retrieve
    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params, order_by)
    paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

    with self._get_session() as session:
        total_items_count = (
            session.execute(self._count_query(find_stmt)).scalar() or 0
        )
        result_items = [x for x in session.execute(paginated_stmt).scalars()]

        return PaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            page=page,
            items_per_page=self._sanitised_query_limit(items_per_page),
        )

cursor_paginated_find

cursor_paginated_find(
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]

Find models using filters and cursor based pagination. Returned results do include pagination metadata.

E.g.

# finds all models with name = John
cursor_paginated_find(search_params={"name":"John"})

# finds first 50 models with name = John
cursor_paginated_find(50, search_params={"name":"John"})

# finds first 50 models after the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123))

# finds last 50 models before the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123), True)

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

None
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None

Returns:

Type Description
CursorPaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def cursor_paginated_find(
    self,
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]:
    """Find models using filters and cursor based pagination. Returned results
    do include pagination metadata.

    E.g.

        # finds all models with name = John
        cursor_paginated_find(search_params={"name":"John"})

        # finds first 50 models with name = John
        cursor_paginated_find(50, search_params={"name":"John"})

        # finds first 50 models after the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123))

        # finds last 50 models before the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123), True)

    :param items_per_page: Number of models to retrieve
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :param search_params: A mapping containing equality filters
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params)

    paginated_stmt = self._cursor_paginated_query(
        find_stmt,
        cursor_reference=cursor_reference,
        is_before_cursor=is_before_cursor,
        items_per_page=items_per_page,
    )

    with self._get_session() as session:
        total_items_count = (
            session.execute(self._count_query(find_stmt)).scalar() or 0
        )
        result_items = [x for x in session.execute(paginated_stmt).scalars()]

        return CursorPaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            items_per_page=self._sanitised_query_limit(items_per_page),
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
        )

sqlalchemy_bind_manager.repository.SQLAlchemyAsyncRepository

Source code in sqlalchemy_bind_manager/_repository/async_.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class SQLAlchemyAsyncRepository(
    Generic[MODEL],
    BaseRepository[MODEL],
):
    _session_handler: AsyncSessionHandler
    _external_session: Union[AsyncSession, None]

    def __init__(
        self,
        bind: Union[SQLAlchemyAsyncBind, None] = None,
        session: Union[AsyncSession, None] = None,
        model_class: Union[Type[MODEL], None] = None,
    ) -> None:
        super().__init__(model_class=model_class)
        if not (bool(bind) ^ bool(session)):
            raise InvalidConfigError(
                "Either `bind` or `session` have to be used, not both"
            )
        self._external_session = session
        if bind:
            self._session_handler = AsyncSessionHandler(bind)

    async def get(self, identifier: PRIMARY_KEY) -> MODEL:
        """Get a model by primary key.

        :param identifier: The primary key
        :return: A model instance
        :raises ModelNotFoundError: No model has been found using the primary key
        """
        async with self._get_session(commit=False) as session:
            model = await session.get(self._model, identifier)
        if model is None:
            raise ModelNotFoundError("No rows found for provided primary key.")
        return model

    async def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
        """Get a list of models by primary keys.

        :param identifiers: A list of primary keys
        :return: A list of models
        """
        stmt = select(self._model).where(
            getattr(self._model, self._model_pk()).in_(identifiers)
        )

        async with self._get_session(commit=False) as session:
            return [x for x in (await session.execute(stmt)).scalars()]

    async def save(self, instance: MODEL) -> MODEL:
        """Persist a model.

        :param instance: A mapped object instance to be persisted
        :return: The model instance after being persisted
        """
        self._fail_if_invalid_models([instance])
        async with self._get_session() as session:
            session.add(instance)
        return instance

    async def save_many(
        self,
        instances: Iterable[MODEL],
    ) -> Iterable[MODEL]:
        """Persist many models in a single database get_session.

        :param instances: A list of mapped objects to be persisted
        :return: The model instances after being persisted
        """
        self._fail_if_invalid_models(instances)
        async with self._get_session() as session:
            session.add_all(instances)
        return instances

    async def delete(self, instance: MODEL) -> None:
        """Deletes a model.

        :param instance: The model instance
        """
        self._fail_if_invalid_models([instance])
        async with self._get_session() as session:
            await session.delete(instance)

    async def delete_many(self, instances: Iterable[MODEL]) -> None:
        """Deletes a collection of models in a single transaction.

        :param instances: The model instances
        """
        self._fail_if_invalid_models(instances)
        async with self._get_session() as session:
            for instance in instances:
                await session.delete(instance)

    async def find(
        self,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> List[MODEL]:
        """Find models using filters.

        E.g.

            # find all models with name = John
            find(search_params={"name":"John"})

            # find all models ordered by `name` column
            find(order_by=["name"])

            # find all models with reversed order by `name` column
            find(order_by=[("name", "desc")])

        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        stmt = self._find_query(search_params, order_by)

        async with self._get_session() as session:
            result = await session.execute(stmt)
            return [x for x in result.scalars()]

    async def paginated_find(
        self,
        items_per_page: int,
        page: int = 1,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> PaginatedResult[MODEL]:
        """Find models using filters and limit/offset pagination. Returned results
        do include pagination metadata.

        E.g.

            # find all models with name = John
            paginated_find(search_params={"name":"John"})

            # find first 50 models with name = John
            paginated_find(50, search_params={"name":"John"})

            # find 50 models with name = John, skipping 2 pages (100)
            paginated_find(50, 3, search_params={"name":"John"})

            # find all models ordered by `name` column
            paginated_find(order_by=["name"])

            # find all models with reversed order by `name` column
            paginated_find(order_by=[("name", "desc")])

        :param items_per_page: Number of models to retrieve
        :param page: Page to retrieve
        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params, order_by)
        paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

        async with self._get_session() as session:
            total_items_count = (
                await session.execute(self._count_query(find_stmt))
            ).scalar() or 0
            result_items = [
                x for x in (await session.execute(paginated_stmt)).scalars()
            ]

            return PaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                page=page,
                items_per_page=self._sanitised_query_limit(items_per_page),
            )

    async def cursor_paginated_find(
        self,
        items_per_page: int,
        cursor_reference: Union[CursorReference, None] = None,
        is_before_cursor: bool = False,
        search_params: Union[None, Mapping[str, Any]] = None,
    ) -> CursorPaginatedResult[MODEL]:
        """Find models using filters and cursor based pagination. Returned results
        do include pagination metadata.

        E.g.

            # finds all models with name = John
            cursor_paginated_find(search_params={"name":"John"})

            # finds first 50 models with name = John
            cursor_paginated_find(50, search_params={"name":"John"})

            # finds first 50 models after the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123))

            # finds last 50 models before the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123), True)

        :param items_per_page: Number of models to retrieve
        :param cursor_reference: A cursor reference containing ordering column
            and threshold value
        :param is_before_cursor: If True it will return items before the cursor,
            otherwise items after
        :param search_params: A mapping containing equality filters
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params)
        paginated_stmt = self._cursor_paginated_query(
            find_stmt,
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
            items_per_page=items_per_page,
        )

        async with self._get_session() as session:
            total_items_count = (
                await session.execute(self._count_query(find_stmt))
            ).scalar() or 0
            result_items = [
                x for x in (await session.execute(paginated_stmt)).scalars()
            ] or []

            return CursorPaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                items_per_page=self._sanitised_query_limit(items_per_page),
                cursor_reference=cursor_reference,
                is_before_cursor=is_before_cursor,
            )

    @asynccontextmanager
    async def _get_session(self, commit: bool = True) -> AsyncIterator[AsyncSession]:
        if not self._external_session:
            async with self._session_handler.get_session(not commit) as _session:
                yield _session
        else:
            yield self._external_session

_is_mapped_class

_is_mapped_class(class_: Type[MODEL]) -> bool

Checks if the class is mapped in SQLAlchemy.

Parameters:

Name Type Description Default
class_ Type[MODEL]

the model class

required

Returns:

Type Description
bool

True if the Type is mapped, False otherwise

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
66
67
68
69
70
71
72
73
74
75
76
77
def _is_mapped_class(self, class_: Type[MODEL]) -> bool:
    """Checks if the class is mapped in SQLAlchemy.

    :param class_: the model class
    :return: True if the Type is mapped, False otherwise
    :rtype: bool
    """
    try:
        class_mapper(class_)
        return True
    except UnmappedClassError:
        return False

_validate_mapped_property

_validate_mapped_property(property_name: str) -> None

Checks if a property is mapped in the model class.

Parameters:

Name Type Description Default
property_name str

The name of the property to be evaluated.

required

Raises:

Type Description
UnmappedPropertyError

When the property is not mapped.

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
79
80
81
82
83
84
85
86
87
88
89
90
91
def _validate_mapped_property(self, property_name: str) -> None:
    """Checks if a property is mapped in the model class.

    :param property_name: The name of the property to be evaluated.
    :type property_name: str
    :raises UnmappedPropertyError: When the property is not mapped.
    """
    m: Mapper = class_mapper(self._model)
    if property_name not in m.column_attrs:
        raise UnmappedPropertyError(
            f"Property `{property_name}` is not mapped"
            f" in the ORM for model `{self._model}`"
        )

_filter_select

_filter_select(
    stmt: Select, search_params: Mapping[str, Any]
) -> Select

Build the query filtering clauses from submitted parameters.

E.g. _filter_select(stmt, name="John") adds a WHERE name = John statement

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
search_params Mapping[str, Any]

Any keyword argument to be used as equality filter

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def _filter_select(self, stmt: Select, search_params: Mapping[str, Any]) -> Select:
    """Build the query filtering clauses from submitted parameters.

    E.g.
    _filter_select(stmt, name="John") adds a `WHERE name = John` statement

    :param stmt: a Select statement
    :type stmt: Select
    :param search_params: Any keyword argument to be used as equality filter
    :type search_params: Mapping[str, Any]
    :return: The filtered query
    """
    # TODO: Add support for relationship eager load
    for k, v in search_params.items():
        """
        This acts as a TypeGuard but using TypeGuard typing would break
        compatibility with python < 3.10, for the moment we prefer to ignore
        typing issues here
        """
        self._validate_mapped_property(k)
        stmt = stmt.where(getattr(self._model, k) == v)
    return stmt

_filter_order_by

_filter_order_by(
    stmt: Select,
    order_by: Iterable[
        Union[str, Tuple[str, Literal["asc", "desc"]]]
    ],
) -> Select

Build the query ordering clauses from submitted parameters.

E.g. _filter_order_by(stmt, ['name']) adds a ORDER BY name statement

_filter_order_by(stmt, [('name', 'asc')]) adds a ORDER BY name ASC statement

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
order_by Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]

a list of columns, or tuples (column, direction)

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def _filter_order_by(
    self,
    stmt: Select,
    order_by: Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
) -> Select:
    """Build the query ordering clauses from submitted parameters.

    E.g.
    `_filter_order_by(stmt, ['name'])`
        adds a `ORDER BY name` statement

    `_filter_order_by(stmt, [('name', 'asc')])`
        adds a `ORDER BY name ASC` statement

    :param stmt: a Select statement
    :param order_by: a list of columns, or tuples (column, direction)
    :return: The filtered query
    """
    _partial_registry: Dict[Literal["asc", "desc"], Callable] = {
        "desc": partial(desc),
        "asc": partial(asc),
    }

    for value in order_by:
        if isinstance(value, str):
            self._validate_mapped_property(value)
            stmt = stmt.order_by(getattr(self._model, value))
        else:
            self._validate_mapped_property(value[0])
            stmt = stmt.order_by(
                _partial_registry[value[1]](getattr(self._model, value[0]))
            )

    return stmt

_find_query

_find_query(
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> Select

Build a query with column filters and orders.

E.g. q = _find_query(search_params={"name":"John"}) finds all models with name = John

q = _find_query(order_by=["name"]) finds all models ordered by name column

q = _find_query(order_by=[("name", "desc")]) finds all models with reversed order by name column

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

Any keyword argument to be used as equality filter

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]

a list of columns, or tuples (column, direction)

None

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def _find_query(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> Select:
    """Build a query with column filters and orders.

    E.g.
    q = _find_query(search_params={"name":"John"})
        finds all models with name = John

    q = _find_query(order_by=["name"])
        finds all models ordered by `name` column

    q = _find_query(order_by=[("name", "desc")])
        finds all models with reversed order by `name` column

    :param search_params: Any keyword argument to be used as equality filter
    :param order_by: a list of columns, or tuples (column, direction)
    :return: The filtered query
    """
    stmt = select(self._model)

    if search_params:
        stmt = self._filter_select(stmt, search_params)
    if order_by is not None:
        stmt = self._filter_order_by(stmt, order_by)

    return stmt

_paginate_query_by_page

_paginate_query_by_page(
    stmt: Select, page: int, items_per_page: int
) -> Select

Build the query offset and limit clauses from submitted parameters.

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
page int

Number of models to skip

required
items_per_page int

Number of models to retrieve

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def _paginate_query_by_page(
    self,
    stmt: Select,
    page: int,
    items_per_page: int,
) -> Select:
    """Build the query offset and limit clauses from submitted parameters.

    :param stmt: a Select statement
    :type stmt: Select
    :param page: Number of models to skip
    :type page: int
    :param items_per_page: Number of models to retrieve
    :type items_per_page: int
    :return: The filtered query
    """

    _offset = max((page - 1) * items_per_page, 0)
    if _offset > 0:
        stmt = stmt.offset(_offset)

    _limit = self._sanitised_query_limit(items_per_page)
    stmt = stmt.limit(_limit)

    return stmt

_cursor_paginated_query

_cursor_paginated_query(
    stmt: Select,
    cursor_reference: Union[CursorReference, None],
    is_before_cursor: bool = False,
    items_per_page: int = _max_query_limit,
) -> Select

Adds the clauses to retrieve the requested slice of models, after or before the cursor value, plus a model before the slice and one after the slice, to identify if previous or next results are available.

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
items_per_page int

Number of models to retrieve

_max_query_limit

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def _cursor_paginated_query(
    self,
    stmt: Select,
    cursor_reference: Union[CursorReference, None],
    is_before_cursor: bool = False,
    items_per_page: int = _max_query_limit,
) -> Select:
    """Adds the clauses to retrieve the requested slice of models, after
    or before the cursor value, plus a model before the slice and one after
    the slice, to identify if previous or next results are available.

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :param items_per_page: Number of models to retrieve
    :type items_per_page: int
    :return: The filtered query
    """
    forward_limit = self._sanitised_query_limit(items_per_page) + 1

    if not cursor_reference:
        return stmt.limit(forward_limit).order_by(  # type: ignore
            asc(self._model_pk())
        )

    previous_query = self._cursor_pagination_previous_item_query(
        stmt, cursor_reference, is_before_cursor
    ).subquery("previous")

    page_query = self._cursor_pagination_slice_query(
        stmt, cursor_reference, forward_limit, is_before_cursor
    ).subquery("slice")

    query = select(
        aliased(
            self._model,
            select(previous_query)
            .union_all(select(page_query))
            .order_by(cursor_reference.column)
            .subquery("cursor_pagination"),  # type: ignore
        )
    )
    return query

_cursor_pagination_slice_query

_cursor_pagination_slice_query(
    stmt: Select,
    cursor_reference: CursorReference,
    limit: int,
    is_before_cursor: bool,
)

Adds the clauses to retrieve a requested slice of models, after or before the cursor value (excluding the cursor itself)

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference CursorReference

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

required
limit int

Number of models to retrieve

required

Returns:

Type Description

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def _cursor_pagination_slice_query(
    self,
    stmt: Select,
    cursor_reference: CursorReference,
    limit: int,
    is_before_cursor: bool,
):
    """Adds the clauses to retrieve a requested slice of models,
    after or before the cursor value (excluding the cursor itself)

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :param limit: Number of models to retrieve
    :type limit: int
    :return: The filtered query
    """
    if not is_before_cursor:
        page_query = stmt.where(
            getattr(self._model, cursor_reference.column) > cursor_reference.value
        )
        page_query = self._filter_order_by(
            page_query, [(cursor_reference.column, "asc")]
        )
    else:
        page_query = stmt.where(
            getattr(self._model, cursor_reference.column) < cursor_reference.value
        )
        page_query = self._filter_order_by(
            page_query, [(cursor_reference.column, "desc")]
        )
    return page_query.limit(limit)

_cursor_pagination_previous_item_query

_cursor_pagination_previous_item_query(
    stmt: Select,
    cursor_reference: CursorReference,
    is_before_cursor: bool,
) -> Select

Adds the clauses to retrieve a single model, after or before the cursor value (including the cursor itself).

Parameters:

Name Type Description Default
stmt Select

a Select statement

required
cursor_reference CursorReference

A cursor reference containing ordering column and threshold value

required
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

required

Returns:

Type Description
Select

The filtered query

Source code in sqlalchemy_bind_manager/_repository/base_repository.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def _cursor_pagination_previous_item_query(
    self, stmt: Select, cursor_reference: CursorReference, is_before_cursor: bool
) -> Select:
    """Adds the clauses to retrieve a single model, after or before
    the cursor value (including the cursor itself).

    :param stmt: a Select statement
    :type stmt: Select
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :type cursor_reference: Union[CursorReference, None]
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :type is_before_cursor: bool
    :return: The filtered query
    """
    if not is_before_cursor:
        previous_query = stmt.where(
            getattr(self._model, cursor_reference.column) <= cursor_reference.value
        )
        previous_query = self._filter_order_by(
            previous_query, [(cursor_reference.column, "desc")]
        )
    else:
        previous_query = stmt.where(
            getattr(self._model, cursor_reference.column) >= cursor_reference.value
        )
        previous_query = self._filter_order_by(
            previous_query, [(cursor_reference.column, "asc")]
        )

    return previous_query.limit(1)

_model_pk

_model_pk() -> str

Retrieves the primary key name from the repository model class.

Returns:

Type Description
str
Source code in sqlalchemy_bind_manager/_repository/base_repository.py
341
342
343
344
345
346
347
348
349
350
351
def _model_pk(self) -> str:
    """
    Retrieves the primary key name from the repository model class.

    :return:
    """
    primary_keys = inspect(self._model).primary_key  # type: ignore
    if len(primary_keys) > 1:
        raise NotImplementedError("Composite primary keys are not supported.")

    return primary_keys[0].name

get async

get(identifier: PRIMARY_KEY) -> MODEL

Get a model by primary key.

Parameters:

Name Type Description Default
identifier PRIMARY_KEY

The primary key

required

Returns:

Type Description
MODEL

A model instance

Raises:

Type Description
ModelNotFoundError

No model has been found using the primary key

Source code in sqlalchemy_bind_manager/_repository/async_.py
76
77
78
79
80
81
82
83
84
85
86
87
async def get(self, identifier: PRIMARY_KEY) -> MODEL:
    """Get a model by primary key.

    :param identifier: The primary key
    :return: A model instance
    :raises ModelNotFoundError: No model has been found using the primary key
    """
    async with self._get_session(commit=False) as session:
        model = await session.get(self._model, identifier)
    if model is None:
        raise ModelNotFoundError("No rows found for provided primary key.")
    return model

get_many async

get_many(identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]

Get a list of models by primary keys.

Parameters:

Name Type Description Default
identifiers Iterable[PRIMARY_KEY]

A list of primary keys

required

Returns:

Type Description
List[MODEL]

A list of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
    """Get a list of models by primary keys.

    :param identifiers: A list of primary keys
    :return: A list of models
    """
    stmt = select(self._model).where(
        getattr(self._model, self._model_pk()).in_(identifiers)
    )

    async with self._get_session(commit=False) as session:
        return [x for x in (await session.execute(stmt)).scalars()]

save async

save(instance: MODEL) -> MODEL

Persist a model.

Parameters:

Name Type Description Default
instance MODEL

A mapped object instance to be persisted

required

Returns:

Type Description
MODEL

The model instance after being persisted

Source code in sqlalchemy_bind_manager/_repository/async_.py
102
103
104
105
106
107
108
109
110
111
async def save(self, instance: MODEL) -> MODEL:
    """Persist a model.

    :param instance: A mapped object instance to be persisted
    :return: The model instance after being persisted
    """
    self._fail_if_invalid_models([instance])
    async with self._get_session() as session:
        session.add(instance)
    return instance

save_many async

save_many(instances: Iterable[MODEL]) -> Iterable[MODEL]

Persist many models in a single database get_session.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

A list of mapped objects to be persisted

required

Returns:

Type Description
Iterable[MODEL]

The model instances after being persisted

Source code in sqlalchemy_bind_manager/_repository/async_.py
113
114
115
116
117
118
119
120
121
122
123
124
125
async def save_many(
    self,
    instances: Iterable[MODEL],
) -> Iterable[MODEL]:
    """Persist many models in a single database get_session.

    :param instances: A list of mapped objects to be persisted
    :return: The model instances after being persisted
    """
    self._fail_if_invalid_models(instances)
    async with self._get_session() as session:
        session.add_all(instances)
    return instances

delete async

delete(instance: MODEL) -> None

Deletes a model.

Parameters:

Name Type Description Default
instance MODEL

The model instance

required
Source code in sqlalchemy_bind_manager/_repository/async_.py
127
128
129
130
131
132
133
134
async def delete(self, instance: MODEL) -> None:
    """Deletes a model.

    :param instance: The model instance
    """
    self._fail_if_invalid_models([instance])
    async with self._get_session() as session:
        await session.delete(instance)

delete_many async

delete_many(instances: Iterable[MODEL]) -> None

Deletes a collection of models in a single transaction.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

The model instances

required
Source code in sqlalchemy_bind_manager/_repository/async_.py
136
137
138
139
140
141
142
143
144
async def delete_many(self, instances: Iterable[MODEL]) -> None:
    """Deletes a collection of models in a single transaction.

    :param instances: The model instances
    """
    self._fail_if_invalid_models(instances)
    async with self._get_session() as session:
        for instance in instances:
            await session.delete(instance)

find async

find(
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> List[MODEL]

Find models using filters.

E.g.

# find all models with name = John
find(search_params={"name":"John"})

# find all models ordered by `name` column
find(order_by=["name"])

# find all models with reversed order by `name` column
find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
List[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def find(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> List[MODEL]:
    """Find models using filters.

    E.g.

        # find all models with name = John
        find(search_params={"name":"John"})

        # find all models ordered by `name` column
        find(order_by=["name"])

        # find all models with reversed order by `name` column
        find(order_by=[("name", "desc")])

    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    stmt = self._find_query(search_params, order_by)

    async with self._get_session() as session:
        result = await session.execute(stmt)
        return [x for x in result.scalars()]

paginated_find async

paginated_find(
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[
            Union[str, Tuple[str, Literal["asc", "desc"]]]
        ],
    ] = None,
) -> PaginatedResult[MODEL]

Find models using filters and limit/offset pagination. Returned results do include pagination metadata.

E.g.

# find all models with name = John
paginated_find(search_params={"name":"John"})

# find first 50 models with name = John
paginated_find(50, search_params={"name":"John"})

# find 50 models with name = John, skipping 2 pages (100)
paginated_find(50, 3, search_params={"name":"John"})

# find all models ordered by `name` column
paginated_find(order_by=["name"])

# find all models with reversed order by `name` column
paginated_find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
page int

Page to retrieve

1
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
PaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
async def paginated_find(
    self,
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> PaginatedResult[MODEL]:
    """Find models using filters and limit/offset pagination. Returned results
    do include pagination metadata.

    E.g.

        # find all models with name = John
        paginated_find(search_params={"name":"John"})

        # find first 50 models with name = John
        paginated_find(50, search_params={"name":"John"})

        # find 50 models with name = John, skipping 2 pages (100)
        paginated_find(50, 3, search_params={"name":"John"})

        # find all models ordered by `name` column
        paginated_find(order_by=["name"])

        # find all models with reversed order by `name` column
        paginated_find(order_by=[("name", "desc")])

    :param items_per_page: Number of models to retrieve
    :param page: Page to retrieve
    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params, order_by)
    paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

    async with self._get_session() as session:
        total_items_count = (
            await session.execute(self._count_query(find_stmt))
        ).scalar() or 0
        result_items = [
            x for x in (await session.execute(paginated_stmt)).scalars()
        ]

        return PaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            page=page,
            items_per_page=self._sanitised_query_limit(items_per_page),
        )

cursor_paginated_find async

cursor_paginated_find(
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]

Find models using filters and cursor based pagination. Returned results do include pagination metadata.

E.g.

# finds all models with name = John
cursor_paginated_find(search_params={"name":"John"})

# finds first 50 models with name = John
cursor_paginated_find(50, search_params={"name":"John"})

# finds first 50 models after the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123))

# finds last 50 models before the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123), True)

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

None
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None

Returns:

Type Description
CursorPaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def cursor_paginated_find(
    self,
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]:
    """Find models using filters and cursor based pagination. Returned results
    do include pagination metadata.

    E.g.

        # finds all models with name = John
        cursor_paginated_find(search_params={"name":"John"})

        # finds first 50 models with name = John
        cursor_paginated_find(50, search_params={"name":"John"})

        # finds first 50 models after the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123))

        # finds last 50 models before the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123), True)

    :param items_per_page: Number of models to retrieve
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :param search_params: A mapping containing equality filters
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params)
    paginated_stmt = self._cursor_paginated_query(
        find_stmt,
        cursor_reference=cursor_reference,
        is_before_cursor=is_before_cursor,
        items_per_page=items_per_page,
    )

    async with self._get_session() as session:
        total_items_count = (
            await session.execute(self._count_query(find_stmt))
        ).scalar() or 0
        result_items = [
            x for x in (await session.execute(paginated_stmt)).scalars()
        ] or []

        return CursorPaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            items_per_page=self._sanitised_query_limit(items_per_page),
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
        )

sqlalchemy_bind_manager.repository.PaginatedResult

The result of a paginated query.

Parameters:

Name Type Description Default
items List[MODEL]

The models returned by the query

required
page_info PageInfo

The pagination metadata

required
Source code in sqlalchemy_bind_manager/_repository/common.py
56
57
58
59
60
61
62
63
64
65
66
67
class PaginatedResult(BaseModel, Generic[MODEL]):
    """
    The result of a paginated query.

    :param items: The models returned by the query
    :type items: List[MODEL]
    :param page_info: The pagination metadata
    :type page_info: PageInfo
    """

    items: List[MODEL]
    page_info: PageInfo

sqlalchemy_bind_manager.repository.PageInfo

Paginated query metadata.

Parameters:

Name Type Description Default
page int

The current page number

required
items_per_page int

The maximum number of items in a page.

required
total_pages int

The number of available pages.

required
total_items int

The total items in all the pages.

required
has_next_page bool

True if there is a next page.

required
has_previous_page bool

True if there is a previous page.

required
Source code in sqlalchemy_bind_manager/_repository/common.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class PageInfo(BaseModel):
    """
    Paginated query metadata.

    :param page: The current page number
    :type page: int
    :param items_per_page: The maximum number of items in a page.
    :type items_per_page: int
    :param total_pages: The number of available pages.
    :type total_pages: int
    :param total_items: The total items in all the pages.
    :type total_items: int
    :param has_next_page: True if there is a next page.
    :type has_next_page: bool
    :param has_previous_page: True if there is a previous page.
    :type has_previous_page: bool
    """

    page: int
    items_per_page: int
    total_pages: int
    total_items: int
    has_next_page: bool
    has_previous_page: bool

sqlalchemy_bind_manager.repository.CursorPaginatedResult

The result of a cursor paginated query.

Parameters:

Name Type Description Default
items List[MODEL]

The models returned by the query

required
page_info CursorPageInfo

The pagination metadata

required
Source code in sqlalchemy_bind_manager/_repository/common.py
103
104
105
106
107
108
109
110
111
112
113
114
class CursorPaginatedResult(BaseModel, Generic[MODEL]):
    """
    The result of a cursor paginated query.

    :param items: The models returned by the query
    :type items: List[MODEL]
    :param page_info: The pagination metadata
    :type page_info: CursorPageInfo
    """

    items: List[MODEL]
    page_info: CursorPageInfo

sqlalchemy_bind_manager.repository.CursorPageInfo

Cursor-paginated query metadata.

Parameters:

Name Type Description Default
items_per_page int

The maximum number of items in a page.

required
total_items int

The total items in all the pages.

required
has_next_page bool

True if there is a next page.

required
has_previous_page bool

True if there is a previous page.

required
start_cursor Union[CursorReference, None]

The cursor pointing to the first item in the page, if at least one item is returned.

required
end_cursor Union[CursorReference, None]

The cursor pointing to the last item in the page, if at least one item is returned.

required
Source code in sqlalchemy_bind_manager/_repository/common.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class CursorPageInfo(BaseModel):
    """
    Cursor-paginated query metadata.

    :param items_per_page: The maximum number of items in a page.
    :type items_per_page: int
    :param total_items: The total items in all the pages.
    :type total_items: int
    :param has_next_page: True if there is a next page.
    :type has_next_page: bool
    :param has_previous_page: True if there is a previous page.
    :type has_previous_page: bool
    :param start_cursor: The cursor pointing to the first item in the page,
    if at least one item is returned.
    :type start_cursor: Union[CursorReference, None]
    :param end_cursor: The cursor pointing to the last item in the page,
    if at least one item is returned.
    :type end_cursor: Union[CursorReference, None]
    """

    items_per_page: int
    total_items: int
    has_next_page: bool = False
    has_previous_page: bool = False
    start_cursor: Union[CursorReference, None] = None
    end_cursor: Union[CursorReference, None] = None