Publisher/Subscriber

PSLX implements the basic Pubsub model through Rabbitmq. The publisher is directly usable as all the basic building blocks are already implemented. In addition, here the message that goes through the pubsub model needs to be a proto message. Here is an example of publisher:

import time
from pslx.micro_service.pubsub.publisher import Publisher
from pslx.util.timezone_util import TimeSleepObj
from pslx.schema.rpc_pb2 import HealthCheckerRequest

if __name__ == "__main__":
    publisher1 = Publisher(
        exchange_name='test_exchange_1',
        topic_name='test1',
        connection_str='amqp://guest:guest@localhost:5672'
    )

    message1 = HealthCheckerRequest()
    message1.server_url = 'test1'
    message1.secure = False

    publisher2 = Publisher(
        exchange_name='test_exchange_2',
        topic_name='test2',
        connection_str='amqp://guest:guest@localhost:5672'
    )

    message2 = HealthCheckerRequest()
    message2.server_url = 'test2'
    message2.secure = True

    while True:
        print("Publish message...")
        publisher1.publish(message=message1)
        publisher2.publish(message=message2)
        time.sleep(TimeSleepObj.ONE_SECOND)

For subscriber, the user needs to wrap it in an operator with the following function defined:

pubsub_parse_message(exchange_name, topic_name, message)

which takes in the exchange and topic (routing_key) of the message. The purpose of this function to allow user to have the access of handling the message.

To bind the subscriber to the operator, one can use:

bind_to_op(op)

An example subscriber of the above publisher is

from pslx.streaming.operator import StreamingOperator
from pslx.streaming.container import DefaultStreamingContainer
from pslx.micro_service.pubsub.subscriber import Subscriber
from pslx.schema.rpc_pb2 import HealthCheckerRequest
from pslx.util.dummy_util import DummyUtil


class SubscriberExampleOp(StreamingOperator):

    def __init__(self):
        super().__init__(operator_name='subscriber_example_op')

    @staticmethod
    def pubsub_msg_parser(exchange_name, topic_name, message):
        print(exchange_name, topic_name, message)

    def execute_impl(self):
        subscriber = Subscriber(
            connection_str='amqp://guest:guest@localhost:5672'
        )
        subscriber.bind_to_op(self)

        subscriber.subscribe(
            exchange_name='test_exchange_1',
            topic_name='test1',
            message_type=HealthCheckerRequest
        )
        subscriber.subscribe(
            exchange_name='test_exchange_2',
            topic_name='test2',
            message_type=HealthCheckerRequest
        )
        subscriber.start()


class SubscriberExampleContainer(DefaultStreamingContainer):
    def __init__(self):
        super().__init__(container_name='subscriber_example_container')


if __name__ == "__main__":
    op = SubscriberExampleOp()
    container = SubscriberExampleContainer()
    container.add_operator_edge(from_operator=op, to_operator=DummyUtil.dummy_streaming_operator())

    container.initialize()
    container.execute()