Skip to content

Microservices API

The Microservices API provides utilities for building distributed microservice architectures with message-based communication.

Core Components

ClientProxy

Bases: ABC

Represents a client proxy object for sending messages, emitting events to receivers on the other side.

NOTE: Some transports may not support all features. For more information, refer to the documentation of each transport (Kafka, Redis and TCP).

emit abstractmethod async

emit(pattern: str, data: Any | BaseDTO | BaseResponse | None = None, timeout: float = 20.0) -> None

Emit event to the broker without pairing and waiting for response.

Parameters:

Name Type Description Default
pattern str

Message pattern

required
data Any | BaseDTO | BaseResponse | None

Data payload if there is. Defaults to None.

None

send async

send(pattern: str, data: Any | BaseDTO | BaseResponse | None = None, timeout: float = 20.0, response_type: type[T] = Any) -> T

Sends message pattern request and waits for result, then returns it.

Parameters:

Name Type Description Default
pattern str

A topic pattern which mostly used in message brokers.

required
data Any | BaseDTO | BaseResponse | None

Data which contains requests payload, if provided. Defaults to None.

None
timeout float

Response timeout in seconds. Defaults to 5.0.

20.0

Returns:

Name Type Description
Any T

Response data

send_as_observable async

send_as_observable(pattern: str, data: Any | BaseDTO | BaseResponse | None = None, timeout: float = 20.0, response_type: type[T] = Any) -> Observable[T]

Sends message pattern request and returns RxPY (Reactivex) observable object.

Parameters:

Name Type Description Default
pattern str

A topic pattern which mostly used in message brokers.

required
data Any | BaseDTO | BaseResponse | None

Data which contains requests payload, if provided. Defaults to None.

None
timeout float

Response timeout in seconds. Defaults to 5.0.

20.0

unwrap

unwrap(rtype: type[T]) -> T

Unwraps the transport instance to the requested type.

Parameters:

Name Type Description Default
rtype type[T]

The type to unwrap to.

required

Raises:

Type Description
NotImplementedError

If the unwrapping is not implemented.

Returns:

Name Type Description
T T

The unwrapped transport instance.

Decorators

MessagePattern

Bases: ControllerDecoratorHook

__init__

__init__(pattern: str | int, transports: Transports | Sequence[Transports] | None = None)

EventPattern

Bases: ControllerDecoratorHook

__init__

__init__(pattern: str | int)

Transport Methods

KafkaTransporter

Bases: BaseTransporter

Kafka Transporter implementation working on ascender framework's default request correlation model.

Supports both RPC and Event message patterns.

Raises:

Type Description
TypeError

If the requested underlying transporter instance type is unknown.

listen async

listen()

Being called each time when server starts

close async

close()

Being executed each times when server stops

unwrap

unwrap(rtype: type[T]) -> T

If user needs this to be unwrapped

RedisTransporter

Bases: BaseTransporter

Redis Transporter implementation working on ascender framework's default request correlation model.

Supports both RPC and Event message patterns.

Raises:

Type Description
TypeError

If the requested underlying transporter instance type is unknown.

listen async

listen()

Called when the server starts. Creates a single connection for publishing and derives a PubSub subscriber for channels based on the event bus subscriptions.

close async

close()

Closes the Redis connection.

unwrap

unwrap(rtype: type[T]) -> T

Returns the underlying Redis connection if requested.

TCPTransporter

Bases: BaseTransporter

Basic TCP Transporter implementation working on ascender framework's request correlation model.

Supports both RPC and Event message patterns.

Raises:

Type Description
TypeError

If the requested underlying transporter instance type is unknown.

NOTE

Current type of transporter and implementation is on beta stage and may change in future releases. Use it with caution.

listen async

listen()

Starts the TCP server and listens for incoming connections.

close async

close()

Stops the TCP server.

unwrap

unwrap(rtype: type[T]) -> T

Returns the underlying TCP server instance.

Enums

Transports

Bases: Enum

TCP class-attribute instance-attribute

TCP = TCPTransporter

REDIS class-attribute instance-attribute

REDIS = RedisTransporter

KAFKA class-attribute instance-attribute

KAFKA = KafkaTransporter

Transport

Bases: TypedDict

Source code in ascender/common/microservices/types/transport.py
class Transport(TypedDict):
    transport: Transports
    options: dict[str | float | int, Any]

See Also