Skip to content

Bind manager

sqlalchemy_bind_manager

sqlalchemy_bind_manager.SQLAlchemyBindManager

Source code in sqlalchemy_bind_manager/_bind_manager.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class SQLAlchemyBindManager:
    __binds: MutableMapping[str, Union[SQLAlchemyBind, SQLAlchemyAsyncBind]]

    def __init__(
        self,
        config: Union[
            Mapping[str, SQLAlchemyConfig],
            SQLAlchemyConfig,
        ],
    ) -> None:
        self.__binds = {}
        if isinstance(config, Mapping):
            for name, conf in config.items():
                self.__init_bind(name, conf)
        else:
            self.__init_bind(DEFAULT_BIND_NAME, config)

    def __init_bind(self, name: str, config: SQLAlchemyConfig):
        if not isinstance(config, SQLAlchemyConfig):
            raise InvalidConfigError(
                f"Config for bind `{name}` is not a SQLAlchemyConfig" f"object"
            )

        engine_options: dict = config.engine_options or {}
        engine_options.setdefault("echo", False)
        engine_options.setdefault("future", True)

        session_options: dict = config.session_options or {}
        session_options.setdefault("expire_on_commit", False)
        session_options.setdefault("autobegin", False)

        if config.async_engine:
            self.__binds[name] = self.__build_async_bind(
                engine_url=config.engine_url,
                engine_options=engine_options,
                session_options=session_options,
            )
        else:
            self.__binds[name] = self.__build_sync_bind(
                engine_url=config.engine_url,
                engine_options=engine_options,
                session_options=session_options,
            )

    def __build_sync_bind(
        self,
        engine_url: str,
        engine_options: dict,
        session_options: dict,
    ) -> SQLAlchemyBind:
        registry_mapper = registry()
        engine = create_engine(engine_url, **engine_options)
        return SQLAlchemyBind(
            engine=engine,
            registry_mapper=registry_mapper,
            session_class=sessionmaker(
                bind=engine,
                class_=Session,
                **session_options,
            ),
            declarative_base=registry_mapper.generate_base(),
        )

    def __build_async_bind(
        self,
        engine_url: str,
        engine_options: dict,
        session_options: dict,
    ) -> SQLAlchemyAsyncBind:
        registry_mapper = registry()
        engine = create_async_engine(engine_url, **engine_options)
        return SQLAlchemyAsyncBind(
            engine=engine,
            registry_mapper=registry_mapper,
            session_class=async_sessionmaker(
                bind=engine,
                **session_options,
            ),
            declarative_base=registry_mapper.generate_base(),
        )

    def get_bind_mappers_metadata(self) -> Mapping[str, MetaData]:
        """
        Returns the registered mappers metadata in a format
        that can be used in Alembic configuration

        :returns: mappers metadata
        """
        return {k: b.registry_mapper.metadata for k, b in self.__binds.items()}

    def get_bind(
        self, bind_name: str = DEFAULT_BIND_NAME
    ) -> Union[SQLAlchemyBind, SQLAlchemyAsyncBind]:
        """
        Returns a bind object by name.

        :param bind_name: A registered bind name
        :return: a bind object
        """
        try:
            return self.__binds[bind_name]
        except KeyError:
            raise NotInitializedBindError("Bind not initialized")

    def get_binds(self) -> Mapping[str, Union[SQLAlchemyBind, SQLAlchemyAsyncBind]]:
        """
        Returns all the registered bind objects.

        :returns: A mapping containing the registered binds
        """
        return self.__binds

    def get_mapper(self, bind_name: str = DEFAULT_BIND_NAME) -> registry:
        """
        Returns the registered SQLAlchemy registry_mapper for the given bind name

        :param bind_name: A registered bind name
        :return: the registered registry_mapper
        """
        return self.get_bind(bind_name).registry_mapper

    def get_session(
        self, bind_name: str = DEFAULT_BIND_NAME
    ) -> Union[Session, AsyncSession]:
        """
        Returns a SQLAlchemy Session object, ready to be used either
        directly or as a context manager

        :param bind_name: A registered bind name
        :return: The SQLAlchemy Session object
        """
        return self.get_bind(bind_name).session_class()

get_bind_mappers_metadata

get_bind_mappers_metadata() -> Mapping[str, MetaData]

Returns the registered mappers metadata in a format that can be used in Alembic configuration

Returns:

Type Description
Mapping[str, MetaData]

mappers metadata

Source code in sqlalchemy_bind_manager/_bind_manager.py
154
155
156
157
158
159
160
161
def get_bind_mappers_metadata(self) -> Mapping[str, MetaData]:
    """
    Returns the registered mappers metadata in a format
    that can be used in Alembic configuration

    :returns: mappers metadata
    """
    return {k: b.registry_mapper.metadata for k, b in self.__binds.items()}

get_bind

get_bind(bind_name: str = DEFAULT_BIND_NAME) -> Union[SQLAlchemyBind, SQLAlchemyAsyncBind]

Returns a bind object by name.

Parameters:

Name Type Description Default
bind_name str

A registered bind name

DEFAULT_BIND_NAME

Returns:

Type Description
Union[SQLAlchemyBind, SQLAlchemyAsyncBind]

a bind object

Source code in sqlalchemy_bind_manager/_bind_manager.py
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_bind(
    self, bind_name: str = DEFAULT_BIND_NAME
) -> Union[SQLAlchemyBind, SQLAlchemyAsyncBind]:
    """
    Returns a bind object by name.

    :param bind_name: A registered bind name
    :return: a bind object
    """
    try:
        return self.__binds[bind_name]
    except KeyError:
        raise NotInitializedBindError("Bind not initialized")

get_binds

get_binds() -> Mapping[str, Union[SQLAlchemyBind, SQLAlchemyAsyncBind]]

Returns all the registered bind objects.

Returns:

Type Description
Mapping[str, Union[SQLAlchemyBind, SQLAlchemyAsyncBind]]

A mapping containing the registered binds

Source code in sqlalchemy_bind_manager/_bind_manager.py
177
178
179
180
181
182
183
def get_binds(self) -> Mapping[str, Union[SQLAlchemyBind, SQLAlchemyAsyncBind]]:
    """
    Returns all the registered bind objects.

    :returns: A mapping containing the registered binds
    """
    return self.__binds

get_mapper

get_mapper(bind_name: str = DEFAULT_BIND_NAME) -> registry

Returns the registered SQLAlchemy registry_mapper for the given bind name

Parameters:

Name Type Description Default
bind_name str

A registered bind name

DEFAULT_BIND_NAME

Returns:

Type Description
registry

the registered registry_mapper

Source code in sqlalchemy_bind_manager/_bind_manager.py
185
186
187
188
189
190
191
192
def get_mapper(self, bind_name: str = DEFAULT_BIND_NAME) -> registry:
    """
    Returns the registered SQLAlchemy registry_mapper for the given bind name

    :param bind_name: A registered bind name
    :return: the registered registry_mapper
    """
    return self.get_bind(bind_name).registry_mapper

get_session

get_session(bind_name: str = DEFAULT_BIND_NAME) -> Union[Session, AsyncSession]

Returns a SQLAlchemy Session object, ready to be used either directly or as a context manager

Parameters:

Name Type Description Default
bind_name str

A registered bind name

DEFAULT_BIND_NAME

Returns:

Type Description
Union[Session, AsyncSession]

The SQLAlchemy Session object

Source code in sqlalchemy_bind_manager/_bind_manager.py
194
195
196
197
198
199
200
201
202
203
204
def get_session(
    self, bind_name: str = DEFAULT_BIND_NAME
) -> Union[Session, AsyncSession]:
    """
    Returns a SQLAlchemy Session object, ready to be used either
    directly or as a context manager

    :param bind_name: A registered bind name
    :return: The SQLAlchemy Session object
    """
    return self.get_bind(bind_name).session_class()

sqlalchemy_bind_manager.SQLAlchemyConfig

Configuration for engines

Source code in sqlalchemy_bind_manager/_bind_manager.py
41
42
43
44
45
46
47
48
49
class SQLAlchemyConfig(BaseModel):
    """
    Configuration for engines
    """

    engine_url: str
    engine_options: Union[dict, None] = None
    session_options: Union[dict, None] = None
    async_engine: StrictBool = False