Skip to content

Repository

sqlalchemy_bind_manager.repository

sqlalchemy_bind_manager.repository.SQLAlchemyRepository

Source code in sqlalchemy_bind_manager/_repository/sync.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class SQLAlchemyRepository(
    Generic[MODEL],
    BaseRepository[MODEL],
    SQLAlchemyRepositoryInterface[MODEL],
):
    _session_handler: SessionHandler
    _external_session: Union[Session, None]

    def __init__(
        self,
        bind: Union[SQLAlchemyBind, None] = None,
        session: Union[Session, None] = None,
        model_class: Union[Type[MODEL], None] = None,
    ) -> None:
        super().__init__(model_class=model_class)
        if not (bool(bind) ^ bool(session)):
            raise InvalidConfigError(
                "Either `bind` or `session` have to be used, not both"
            )
        self._external_session = session
        if bind:
            self._session_handler = SessionHandler(bind)

    def get(self, identifier: PRIMARY_KEY) -> MODEL:
        """Get a model by primary key.

        :param identifier: The primary key
        :return: A model instance
        :raises ModelNotFoundError: No model has been found using the primary key
        """
        with self._get_session(commit=False) as session:
            model = session.get(self._model, identifier)
        if model is None:
            raise ModelNotFoundError("No rows found for provided primary key.")
        return model

    def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
        """Get a list of models by primary keys.

        :param identifiers: A list of primary keys
        :return: A list of models
        """
        stmt = select(self._model).where(
            getattr(self._model, self._model_pk()).in_(identifiers)
        )

        with self._get_session(commit=False) as session:
            return [x for x in session.execute(stmt).scalars()]

    def save(self, instance: MODEL) -> MODEL:
        """Persist a model.

        :param instance: A mapped object instance to be persisted
        :return: The model instance after being persisted
        """
        self._fail_if_invalid_models([instance])
        with self._get_session() as session:
            session.add(instance)
        return instance

    def save_many(self, instances: Iterable[MODEL]) -> Iterable[MODEL]:
        """Persist many models in a single database get_session.

        :param instances: A list of mapped objects to be persisted
        :return: The model instances after being persisted
        """
        self._fail_if_invalid_models(instances)
        with self._get_session() as session:
            session.add_all(instances)
        return instances

    def delete(self, instance: MODEL) -> None:
        """Deletes a model.

        :param instance: The model instance
        """
        self._fail_if_invalid_models([instance])
        with self._get_session() as session:
            session.delete(instance)

    def delete_many(self, instances: Iterable[MODEL]) -> None:
        """Deletes a collection of models in a single transaction.

        :param instances: The model instances
        """
        self._fail_if_invalid_models(instances)
        with self._get_session() as session:
            for model in instances:
                session.delete(model)

    def find(
        self,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> List[MODEL]:
        """Find models using filters.

        E.g.

            # find all models with name = John
            find(search_params={"name":"John"})

            # find all models ordered by `name` column
            find(order_by=["name"])

            # find all models with reversed order by `name` column
            find(order_by=[("name", "desc")])

        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        stmt = self._find_query(search_params, order_by)

        with self._get_session() as session:
            result = session.execute(stmt)
            return [x for x in result.scalars()]

    def paginated_find(
        self,
        items_per_page: int,
        page: int = 1,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> PaginatedResult[MODEL]:
        """Find models using filters and limit/offset pagination. Returned results
        do include pagination metadata.

        E.g.

            # find all models with name = John
            paginated_find(search_params={"name":"John"})

            # find first 50 models with name = John
            paginated_find(50, search_params={"name":"John"})

            # find 50 models with name = John, skipping 2 pages (100)
            paginated_find(50, 3, search_params={"name":"John"})

            # find all models ordered by `name` column
            paginated_find(order_by=["name"])

            # find all models with reversed order by `name` column
            paginated_find(order_by=[("name", "desc")])

        :param items_per_page: Number of models to retrieve
        :param page: Page to retrieve
        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params, order_by)
        paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

        with self._get_session() as session:
            total_items_count = (
                session.execute(self._count_query(find_stmt)).scalar() or 0
            )
            result_items = [x for x in session.execute(paginated_stmt).scalars()]

            return PaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                page=page,
                items_per_page=self._sanitised_query_limit(items_per_page),
            )

    def cursor_paginated_find(
        self,
        items_per_page: int,
        cursor_reference: Union[CursorReference, None] = None,
        is_before_cursor: bool = False,
        search_params: Union[None, Mapping[str, Any]] = None,
    ) -> CursorPaginatedResult[MODEL]:
        """Find models using filters and cursor based pagination. Returned results
        do include pagination metadata.

        E.g.

            # finds all models with name = John
            cursor_paginated_find(search_params={"name":"John"})

            # finds first 50 models with name = John
            cursor_paginated_find(50, search_params={"name":"John"})

            # finds first 50 models after the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123))

            # finds last 50 models before the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123), True)

        :param items_per_page: Number of models to retrieve
        :param cursor_reference: A cursor reference containing ordering column
            and threshold value
        :param is_before_cursor: If True it will return items before the cursor,
            otherwise items after
        :param search_params: A mapping containing equality filters
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params)

        paginated_stmt = self._cursor_paginated_query(
            find_stmt,
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
            items_per_page=items_per_page,
        )

        with self._get_session() as session:
            total_items_count = (
                session.execute(self._count_query(find_stmt)).scalar() or 0
            )
            result_items = [x for x in session.execute(paginated_stmt).scalars()]

            return CursorPaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                items_per_page=self._sanitised_query_limit(items_per_page),
                cursor_reference=cursor_reference,
                is_before_cursor=is_before_cursor,
            )

    @contextmanager
    def _get_session(self, commit: bool = True) -> Iterator[Session]:
        if not self._external_session:
            with self._session_handler.get_session(not commit) as _session:
                yield _session
        else:
            yield self._external_session

get

get(identifier: PRIMARY_KEY) -> MODEL

Get a model by primary key.

Parameters:

Name Type Description Default
identifier PRIMARY_KEY

The primary key

required

Returns:

Type Description
MODEL

A model instance

Raises:

Type Description
ModelNotFoundError

No model has been found using the primary key

Source code in sqlalchemy_bind_manager/_repository/sync.py
78
79
80
81
82
83
84
85
86
87
88
89
def get(self, identifier: PRIMARY_KEY) -> MODEL:
    """Get a model by primary key.

    :param identifier: The primary key
    :return: A model instance
    :raises ModelNotFoundError: No model has been found using the primary key
    """
    with self._get_session(commit=False) as session:
        model = session.get(self._model, identifier)
    if model is None:
        raise ModelNotFoundError("No rows found for provided primary key.")
    return model

get_many

get_many(identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]

Get a list of models by primary keys.

Parameters:

Name Type Description Default
identifiers Iterable[PRIMARY_KEY]

A list of primary keys

required

Returns:

Type Description
List[MODEL]

A list of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
    """Get a list of models by primary keys.

    :param identifiers: A list of primary keys
    :return: A list of models
    """
    stmt = select(self._model).where(
        getattr(self._model, self._model_pk()).in_(identifiers)
    )

    with self._get_session(commit=False) as session:
        return [x for x in session.execute(stmt).scalars()]

save

save(instance: MODEL) -> MODEL

Persist a model.

Parameters:

Name Type Description Default
instance MODEL

A mapped object instance to be persisted

required

Returns:

Type Description
MODEL

The model instance after being persisted

Source code in sqlalchemy_bind_manager/_repository/sync.py
104
105
106
107
108
109
110
111
112
113
def save(self, instance: MODEL) -> MODEL:
    """Persist a model.

    :param instance: A mapped object instance to be persisted
    :return: The model instance after being persisted
    """
    self._fail_if_invalid_models([instance])
    with self._get_session() as session:
        session.add(instance)
    return instance

save_many

save_many(instances: Iterable[MODEL]) -> Iterable[MODEL]

Persist many models in a single database get_session.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

A list of mapped objects to be persisted

required

Returns:

Type Description
Iterable[MODEL]

The model instances after being persisted

Source code in sqlalchemy_bind_manager/_repository/sync.py
115
116
117
118
119
120
121
122
123
124
def save_many(self, instances: Iterable[MODEL]) -> Iterable[MODEL]:
    """Persist many models in a single database get_session.

    :param instances: A list of mapped objects to be persisted
    :return: The model instances after being persisted
    """
    self._fail_if_invalid_models(instances)
    with self._get_session() as session:
        session.add_all(instances)
    return instances

delete

delete(instance: MODEL) -> None

Deletes a model.

Parameters:

Name Type Description Default
instance MODEL

The model instance

required
Source code in sqlalchemy_bind_manager/_repository/sync.py
126
127
128
129
130
131
132
133
def delete(self, instance: MODEL) -> None:
    """Deletes a model.

    :param instance: The model instance
    """
    self._fail_if_invalid_models([instance])
    with self._get_session() as session:
        session.delete(instance)

delete_many

delete_many(instances: Iterable[MODEL]) -> None

Deletes a collection of models in a single transaction.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

The model instances

required
Source code in sqlalchemy_bind_manager/_repository/sync.py
135
136
137
138
139
140
141
142
143
def delete_many(self, instances: Iterable[MODEL]) -> None:
    """Deletes a collection of models in a single transaction.

    :param instances: The model instances
    """
    self._fail_if_invalid_models(instances)
    with self._get_session() as session:
        for model in instances:
            session.delete(model)

find

find(search_params: Union[None, Mapping[str, Any]] = None, order_by: Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]] = None) -> List[MODEL]

Find models using filters.

E.g.

# find all models with name = John
find(search_params={"name":"John"})

# find all models ordered by `name` column
find(order_by=["name"])

# find all models with reversed order by `name` column
find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
List[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def find(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> List[MODEL]:
    """Find models using filters.

    E.g.

        # find all models with name = John
        find(search_params={"name":"John"})

        # find all models ordered by `name` column
        find(order_by=["name"])

        # find all models with reversed order by `name` column
        find(order_by=[("name", "desc")])

    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    stmt = self._find_query(search_params, order_by)

    with self._get_session() as session:
        result = session.execute(stmt)
        return [x for x in result.scalars()]

paginated_find

paginated_find(items_per_page: int, page: int = 1, search_params: Union[None, Mapping[str, Any]] = None, order_by: Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]] = None) -> PaginatedResult[MODEL]

Find models using filters and limit/offset pagination. Returned results do include pagination metadata.

E.g.

# find all models with name = John
paginated_find(search_params={"name":"John"})

# find first 50 models with name = John
paginated_find(50, search_params={"name":"John"})

# find 50 models with name = John, skipping 2 pages (100)
paginated_find(50, 3, search_params={"name":"John"})

# find all models ordered by `name` column
paginated_find(order_by=["name"])

# find all models with reversed order by `name` column
paginated_find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
page int

Page to retrieve

1
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
PaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def paginated_find(
    self,
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> PaginatedResult[MODEL]:
    """Find models using filters and limit/offset pagination. Returned results
    do include pagination metadata.

    E.g.

        # find all models with name = John
        paginated_find(search_params={"name":"John"})

        # find first 50 models with name = John
        paginated_find(50, search_params={"name":"John"})

        # find 50 models with name = John, skipping 2 pages (100)
        paginated_find(50, 3, search_params={"name":"John"})

        # find all models ordered by `name` column
        paginated_find(order_by=["name"])

        # find all models with reversed order by `name` column
        paginated_find(order_by=[("name", "desc")])

    :param items_per_page: Number of models to retrieve
    :param page: Page to retrieve
    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params, order_by)
    paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

    with self._get_session() as session:
        total_items_count = (
            session.execute(self._count_query(find_stmt)).scalar() or 0
        )
        result_items = [x for x in session.execute(paginated_stmt).scalars()]

        return PaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            page=page,
            items_per_page=self._sanitised_query_limit(items_per_page),
        )

cursor_paginated_find

cursor_paginated_find(items_per_page: int, cursor_reference: Union[CursorReference, None] = None, is_before_cursor: bool = False, search_params: Union[None, Mapping[str, Any]] = None) -> CursorPaginatedResult[MODEL]

Find models using filters and cursor based pagination. Returned results do include pagination metadata.

E.g.

# finds all models with name = John
cursor_paginated_find(search_params={"name":"John"})

# finds first 50 models with name = John
cursor_paginated_find(50, search_params={"name":"John"})

# finds first 50 models after the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123))

# finds last 50 models before the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123), True)

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

None
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None

Returns:

Type Description
CursorPaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/sync.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def cursor_paginated_find(
    self,
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]:
    """Find models using filters and cursor based pagination. Returned results
    do include pagination metadata.

    E.g.

        # finds all models with name = John
        cursor_paginated_find(search_params={"name":"John"})

        # finds first 50 models with name = John
        cursor_paginated_find(50, search_params={"name":"John"})

        # finds first 50 models after the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123))

        # finds last 50 models before the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123), True)

    :param items_per_page: Number of models to retrieve
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :param search_params: A mapping containing equality filters
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params)

    paginated_stmt = self._cursor_paginated_query(
        find_stmt,
        cursor_reference=cursor_reference,
        is_before_cursor=is_before_cursor,
        items_per_page=items_per_page,
    )

    with self._get_session() as session:
        total_items_count = (
            session.execute(self._count_query(find_stmt)).scalar() or 0
        )
        result_items = [x for x in session.execute(paginated_stmt).scalars()]

        return CursorPaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            items_per_page=self._sanitised_query_limit(items_per_page),
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
        )

sqlalchemy_bind_manager.repository.SQLAlchemyAsyncRepository

Source code in sqlalchemy_bind_manager/_repository/async_.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class SQLAlchemyAsyncRepository(
    Generic[MODEL],
    BaseRepository[MODEL],
    SQLAlchemyAsyncRepositoryInterface[MODEL],
):
    _session_handler: AsyncSessionHandler
    _external_session: Union[AsyncSession, None]

    def __init__(
        self,
        bind: Union[SQLAlchemyAsyncBind, None] = None,
        session: Union[AsyncSession, None] = None,
        model_class: Union[Type[MODEL], None] = None,
    ) -> None:
        super().__init__(model_class=model_class)
        if not (bool(bind) ^ bool(session)):
            raise InvalidConfigError(
                "Either `bind` or `session` have to be used, not both"
            )
        self._external_session = session
        if bind:
            self._session_handler = AsyncSessionHandler(bind)

    async def get(self, identifier: PRIMARY_KEY) -> MODEL:
        """Get a model by primary key.

        :param identifier: The primary key
        :return: A model instance
        :raises ModelNotFoundError: No model has been found using the primary key
        """
        async with self._get_session(commit=False) as session:
            model = await session.get(self._model, identifier)
        if model is None:
            raise ModelNotFoundError("No rows found for provided primary key.")
        return model

    async def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
        """Get a list of models by primary keys.

        :param identifiers: A list of primary keys
        :return: A list of models
        """
        stmt = select(self._model).where(
            getattr(self._model, self._model_pk()).in_(identifiers)
        )

        async with self._get_session(commit=False) as session:
            return [x for x in (await session.execute(stmt)).scalars()]

    async def save(self, instance: MODEL) -> MODEL:
        """Persist a model.

        :param instance: A mapped object instance to be persisted
        :return: The model instance after being persisted
        """
        self._fail_if_invalid_models([instance])
        async with self._get_session() as session:
            session.add(instance)
        return instance

    async def save_many(
        self,
        instances: Iterable[MODEL],
    ) -> Iterable[MODEL]:
        """Persist many models in a single database get_session.

        :param instances: A list of mapped objects to be persisted
        :return: The model instances after being persisted
        """
        self._fail_if_invalid_models(instances)
        async with self._get_session() as session:
            session.add_all(instances)
        return instances

    async def delete(self, instance: MODEL) -> None:
        """Deletes a model.

        :param instance: The model instance
        """
        self._fail_if_invalid_models([instance])
        async with self._get_session() as session:
            await session.delete(instance)

    async def delete_many(self, instances: Iterable[MODEL]) -> None:
        """Deletes a collection of models in a single transaction.

        :param instances: The model instances
        """
        self._fail_if_invalid_models(instances)
        async with self._get_session() as session:
            for instance in instances:
                await session.delete(instance)

    async def find(
        self,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> List[MODEL]:
        """Find models using filters.

        E.g.

            # find all models with name = John
            find(search_params={"name":"John"})

            # find all models ordered by `name` column
            find(order_by=["name"])

            # find all models with reversed order by `name` column
            find(order_by=[("name", "desc")])

        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        stmt = self._find_query(search_params, order_by)

        async with self._get_session() as session:
            result = await session.execute(stmt)
            return [x for x in result.scalars()]

    async def paginated_find(
        self,
        items_per_page: int,
        page: int = 1,
        search_params: Union[None, Mapping[str, Any]] = None,
        order_by: Union[
            None,
            Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
        ] = None,
    ) -> PaginatedResult[MODEL]:
        """Find models using filters and limit/offset pagination. Returned results
        do include pagination metadata.

        E.g.

            # find all models with name = John
            paginated_find(search_params={"name":"John"})

            # find first 50 models with name = John
            paginated_find(50, search_params={"name":"John"})

            # find 50 models with name = John, skipping 2 pages (100)
            paginated_find(50, 3, search_params={"name":"John"})

            # find all models ordered by `name` column
            paginated_find(order_by=["name"])

            # find all models with reversed order by `name` column
            paginated_find(order_by=[("name", "desc")])

        :param items_per_page: Number of models to retrieve
        :param page: Page to retrieve
        :param search_params: A mapping containing equality filters
        :param order_by:
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params, order_by)
        paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

        async with self._get_session() as session:
            total_items_count = (
                await session.execute(self._count_query(find_stmt))
            ).scalar() or 0
            result_items = [
                x for x in (await session.execute(paginated_stmt)).scalars()
            ]

            return PaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                page=page,
                items_per_page=self._sanitised_query_limit(items_per_page),
            )

    async def cursor_paginated_find(
        self,
        items_per_page: int,
        cursor_reference: Union[CursorReference, None] = None,
        is_before_cursor: bool = False,
        search_params: Union[None, Mapping[str, Any]] = None,
    ) -> CursorPaginatedResult[MODEL]:
        """Find models using filters and cursor based pagination. Returned results
        do include pagination metadata.

        E.g.

            # finds all models with name = John
            cursor_paginated_find(search_params={"name":"John"})

            # finds first 50 models with name = John
            cursor_paginated_find(50, search_params={"name":"John"})

            # finds first 50 models after the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123))

            # finds last 50 models before the one with "id" 123
            cursor_paginated_find(50, CursorReference(column="id", value=123), True)

        :param items_per_page: Number of models to retrieve
        :param cursor_reference: A cursor reference containing ordering column
            and threshold value
        :param is_before_cursor: If True it will return items before the cursor,
            otherwise items after
        :param search_params: A mapping containing equality filters
        :return: A collection of models
        """
        find_stmt = self._find_query(search_params)
        paginated_stmt = self._cursor_paginated_query(
            find_stmt,
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
            items_per_page=items_per_page,
        )

        async with self._get_session() as session:
            total_items_count = (
                await session.execute(self._count_query(find_stmt))
            ).scalar() or 0
            result_items = [
                x for x in (await session.execute(paginated_stmt)).scalars()
            ] or []

            return CursorPaginatedResultPresenter.build_result(
                result_items=result_items,
                total_items_count=total_items_count,
                items_per_page=self._sanitised_query_limit(items_per_page),
                cursor_reference=cursor_reference,
                is_before_cursor=is_before_cursor,
            )

    @asynccontextmanager
    async def _get_session(self, commit: bool = True) -> AsyncIterator[AsyncSession]:
        if not self._external_session:
            async with self._session_handler.get_session(not commit) as _session:
                yield _session
        else:
            yield self._external_session

get async

get(identifier: PRIMARY_KEY) -> MODEL

Get a model by primary key.

Parameters:

Name Type Description Default
identifier PRIMARY_KEY

The primary key

required

Returns:

Type Description
MODEL

A model instance

Raises:

Type Description
ModelNotFoundError

No model has been found using the primary key

Source code in sqlalchemy_bind_manager/_repository/async_.py
78
79
80
81
82
83
84
85
86
87
88
89
async def get(self, identifier: PRIMARY_KEY) -> MODEL:
    """Get a model by primary key.

    :param identifier: The primary key
    :return: A model instance
    :raises ModelNotFoundError: No model has been found using the primary key
    """
    async with self._get_session(commit=False) as session:
        model = await session.get(self._model, identifier)
    if model is None:
        raise ModelNotFoundError("No rows found for provided primary key.")
    return model

get_many async

get_many(identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]

Get a list of models by primary keys.

Parameters:

Name Type Description Default
identifiers Iterable[PRIMARY_KEY]

A list of primary keys

required

Returns:

Type Description
List[MODEL]

A list of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def get_many(self, identifiers: Iterable[PRIMARY_KEY]) -> List[MODEL]:
    """Get a list of models by primary keys.

    :param identifiers: A list of primary keys
    :return: A list of models
    """
    stmt = select(self._model).where(
        getattr(self._model, self._model_pk()).in_(identifiers)
    )

    async with self._get_session(commit=False) as session:
        return [x for x in (await session.execute(stmt)).scalars()]

save async

save(instance: MODEL) -> MODEL

Persist a model.

Parameters:

Name Type Description Default
instance MODEL

A mapped object instance to be persisted

required

Returns:

Type Description
MODEL

The model instance after being persisted

Source code in sqlalchemy_bind_manager/_repository/async_.py
104
105
106
107
108
109
110
111
112
113
async def save(self, instance: MODEL) -> MODEL:
    """Persist a model.

    :param instance: A mapped object instance to be persisted
    :return: The model instance after being persisted
    """
    self._fail_if_invalid_models([instance])
    async with self._get_session() as session:
        session.add(instance)
    return instance

save_many async

save_many(instances: Iterable[MODEL]) -> Iterable[MODEL]

Persist many models in a single database get_session.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

A list of mapped objects to be persisted

required

Returns:

Type Description
Iterable[MODEL]

The model instances after being persisted

Source code in sqlalchemy_bind_manager/_repository/async_.py
115
116
117
118
119
120
121
122
123
124
125
126
127
async def save_many(
    self,
    instances: Iterable[MODEL],
) -> Iterable[MODEL]:
    """Persist many models in a single database get_session.

    :param instances: A list of mapped objects to be persisted
    :return: The model instances after being persisted
    """
    self._fail_if_invalid_models(instances)
    async with self._get_session() as session:
        session.add_all(instances)
    return instances

delete async

delete(instance: MODEL) -> None

Deletes a model.

Parameters:

Name Type Description Default
instance MODEL

The model instance

required
Source code in sqlalchemy_bind_manager/_repository/async_.py
129
130
131
132
133
134
135
136
async def delete(self, instance: MODEL) -> None:
    """Deletes a model.

    :param instance: The model instance
    """
    self._fail_if_invalid_models([instance])
    async with self._get_session() as session:
        await session.delete(instance)

delete_many async

delete_many(instances: Iterable[MODEL]) -> None

Deletes a collection of models in a single transaction.

Parameters:

Name Type Description Default
instances Iterable[MODEL]

The model instances

required
Source code in sqlalchemy_bind_manager/_repository/async_.py
138
139
140
141
142
143
144
145
146
async def delete_many(self, instances: Iterable[MODEL]) -> None:
    """Deletes a collection of models in a single transaction.

    :param instances: The model instances
    """
    self._fail_if_invalid_models(instances)
    async with self._get_session() as session:
        for instance in instances:
            await session.delete(instance)

find async

find(search_params: Union[None, Mapping[str, Any]] = None, order_by: Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]] = None) -> List[MODEL]

Find models using filters.

E.g.

# find all models with name = John
find(search_params={"name":"John"})

# find all models ordered by `name` column
find(order_by=["name"])

# find all models with reversed order by `name` column
find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
List[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def find(
    self,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> List[MODEL]:
    """Find models using filters.

    E.g.

        # find all models with name = John
        find(search_params={"name":"John"})

        # find all models ordered by `name` column
        find(order_by=["name"])

        # find all models with reversed order by `name` column
        find(order_by=[("name", "desc")])

    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    stmt = self._find_query(search_params, order_by)

    async with self._get_session() as session:
        result = await session.execute(stmt)
        return [x for x in result.scalars()]

paginated_find async

paginated_find(items_per_page: int, page: int = 1, search_params: Union[None, Mapping[str, Any]] = None, order_by: Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]] = None) -> PaginatedResult[MODEL]

Find models using filters and limit/offset pagination. Returned results do include pagination metadata.

E.g.

# find all models with name = John
paginated_find(search_params={"name":"John"})

# find first 50 models with name = John
paginated_find(50, search_params={"name":"John"})

# find 50 models with name = John, skipping 2 pages (100)
paginated_find(50, 3, search_params={"name":"John"})

# find all models ordered by `name` column
paginated_find(order_by=["name"])

# find all models with reversed order by `name` column
paginated_find(order_by=[("name", "desc")])

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
page int

Page to retrieve

1
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None
order_by Union[None, Iterable[Union[str, Tuple[str, Literal['asc', 'desc']]]]]
None

Returns:

Type Description
PaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def paginated_find(
    self,
    items_per_page: int,
    page: int = 1,
    search_params: Union[None, Mapping[str, Any]] = None,
    order_by: Union[
        None,
        Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]],
    ] = None,
) -> PaginatedResult[MODEL]:
    """Find models using filters and limit/offset pagination. Returned results
    do include pagination metadata.

    E.g.

        # find all models with name = John
        paginated_find(search_params={"name":"John"})

        # find first 50 models with name = John
        paginated_find(50, search_params={"name":"John"})

        # find 50 models with name = John, skipping 2 pages (100)
        paginated_find(50, 3, search_params={"name":"John"})

        # find all models ordered by `name` column
        paginated_find(order_by=["name"])

        # find all models with reversed order by `name` column
        paginated_find(order_by=[("name", "desc")])

    :param items_per_page: Number of models to retrieve
    :param page: Page to retrieve
    :param search_params: A mapping containing equality filters
    :param order_by:
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params, order_by)
    paginated_stmt = self._paginate_query_by_page(find_stmt, page, items_per_page)

    async with self._get_session() as session:
        total_items_count = (
            await session.execute(self._count_query(find_stmt))
        ).scalar() or 0
        result_items = [
            x for x in (await session.execute(paginated_stmt)).scalars()
        ]

        return PaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            page=page,
            items_per_page=self._sanitised_query_limit(items_per_page),
        )

cursor_paginated_find async

cursor_paginated_find(items_per_page: int, cursor_reference: Union[CursorReference, None] = None, is_before_cursor: bool = False, search_params: Union[None, Mapping[str, Any]] = None) -> CursorPaginatedResult[MODEL]

Find models using filters and cursor based pagination. Returned results do include pagination metadata.

E.g.

# finds all models with name = John
cursor_paginated_find(search_params={"name":"John"})

# finds first 50 models with name = John
cursor_paginated_find(50, search_params={"name":"John"})

# finds first 50 models after the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123))

# finds last 50 models before the one with "id" 123
cursor_paginated_find(50, CursorReference(column="id", value=123), True)

Parameters:

Name Type Description Default
items_per_page int

Number of models to retrieve

required
cursor_reference Union[CursorReference, None]

A cursor reference containing ordering column and threshold value

None
is_before_cursor bool

If True it will return items before the cursor, otherwise items after

False
search_params Union[None, Mapping[str, Any]]

A mapping containing equality filters

None

Returns:

Type Description
CursorPaginatedResult[MODEL]

A collection of models

Source code in sqlalchemy_bind_manager/_repository/async_.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
async def cursor_paginated_find(
    self,
    items_per_page: int,
    cursor_reference: Union[CursorReference, None] = None,
    is_before_cursor: bool = False,
    search_params: Union[None, Mapping[str, Any]] = None,
) -> CursorPaginatedResult[MODEL]:
    """Find models using filters and cursor based pagination. Returned results
    do include pagination metadata.

    E.g.

        # finds all models with name = John
        cursor_paginated_find(search_params={"name":"John"})

        # finds first 50 models with name = John
        cursor_paginated_find(50, search_params={"name":"John"})

        # finds first 50 models after the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123))

        # finds last 50 models before the one with "id" 123
        cursor_paginated_find(50, CursorReference(column="id", value=123), True)

    :param items_per_page: Number of models to retrieve
    :param cursor_reference: A cursor reference containing ordering column
        and threshold value
    :param is_before_cursor: If True it will return items before the cursor,
        otherwise items after
    :param search_params: A mapping containing equality filters
    :return: A collection of models
    """
    find_stmt = self._find_query(search_params)
    paginated_stmt = self._cursor_paginated_query(
        find_stmt,
        cursor_reference=cursor_reference,
        is_before_cursor=is_before_cursor,
        items_per_page=items_per_page,
    )

    async with self._get_session() as session:
        total_items_count = (
            await session.execute(self._count_query(find_stmt))
        ).scalar() or 0
        result_items = [
            x for x in (await session.execute(paginated_stmt)).scalars()
        ] or []

        return CursorPaginatedResultPresenter.build_result(
            result_items=result_items,
            total_items_count=total_items_count,
            items_per_page=self._sanitised_query_limit(items_per_page),
            cursor_reference=cursor_reference,
            is_before_cursor=is_before_cursor,
        )

sqlalchemy_bind_manager.repository.PaginatedResult

The result of a paginated query.

Parameters:

Name Type Description Default
items List[MODEL]

The models returned by the query

required
page_info PageInfo

The pagination metadata

required
Source code in sqlalchemy_bind_manager/_repository/common.py
55
56
57
58
59
60
61
62
63
64
65
66
class PaginatedResult(BaseModel, Generic[MODEL]):
    """
    The result of a paginated query.

    :param items: The models returned by the query
    :type items: List[MODEL]
    :param page_info: The pagination metadata
    :type page_info: PageInfo
    """

    items: List[MODEL]
    page_info: PageInfo

sqlalchemy_bind_manager.repository.PageInfo

Paginated query metadata.

Parameters:

Name Type Description Default
page int

The current page number

required
items_per_page int

The maximum number of items in a page.

required
total_pages int

The number of available pages.

required
total_items int

The total items in all the pages.

required
has_next_page bool

True if there is a next page.

required
has_previous_page bool

True if there is a previous page.

required
Source code in sqlalchemy_bind_manager/_repository/common.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class PageInfo(BaseModel):
    """
    Paginated query metadata.

    :param page: The current page number
    :type page: int
    :param items_per_page: The maximum number of items in a page.
    :type items_per_page: int
    :param total_pages: The number of available pages.
    :type total_pages: int
    :param total_items: The total items in all the pages.
    :type total_items: int
    :param has_next_page: True if there is a next page.
    :type has_next_page: bool
    :param has_previous_page: True if there is a previous page.
    :type has_previous_page: bool
    """

    page: int
    items_per_page: int
    total_pages: int
    total_items: int
    has_next_page: bool
    has_previous_page: bool

sqlalchemy_bind_manager.repository.CursorPaginatedResult

The result of a cursor paginated query.

Parameters:

Name Type Description Default
items List[MODEL]

The models returned by the query

required
page_info CursorPageInfo

The pagination metadata

required
Source code in sqlalchemy_bind_manager/_repository/common.py
102
103
104
105
106
107
108
109
110
111
112
113
class CursorPaginatedResult(BaseModel, Generic[MODEL]):
    """
    The result of a cursor paginated query.

    :param items: The models returned by the query
    :type items: List[MODEL]
    :param page_info: The pagination metadata
    :type page_info: CursorPageInfo
    """

    items: List[MODEL]
    page_info: CursorPageInfo

sqlalchemy_bind_manager.repository.CursorPageInfo

Cursor-paginated query metadata.

Parameters:

Name Type Description Default
items_per_page int

The maximum number of items in a page.

required
total_items int

The total items in all the pages.

required
has_next_page bool

True if there is a next page.

required
has_previous_page bool

True if there is a previous page.

required
start_cursor Union[CursorReference, None]

The cursor pointing to the first item in the page, if at least one item is returned.

required
end_cursor Union[CursorReference, None]

The cursor pointing to the last item in the page, if at least one item is returned.

required
Source code in sqlalchemy_bind_manager/_repository/common.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class CursorPageInfo(BaseModel):
    """
    Cursor-paginated query metadata.

    :param items_per_page: The maximum number of items in a page.
    :type items_per_page: int
    :param total_items: The total items in all the pages.
    :type total_items: int
    :param has_next_page: True if there is a next page.
    :type has_next_page: bool
    :param has_previous_page: True if there is a previous page.
    :type has_previous_page: bool
    :param start_cursor: The cursor pointing to the first item in the page,
    if at least one item is returned.
    :type start_cursor: Union[CursorReference, None]
    :param end_cursor: The cursor pointing to the last item in the page,
    if at least one item is returned.
    :type end_cursor: Union[CursorReference, None]
    """

    items_per_page: int
    total_items: int
    has_next_page: bool = False
    has_previous_page: bool = False
    start_cursor: Union[CursorReference, None] = None
    end_cursor: Union[CursorReference, None] = None