Message Queue
PSLX also implements a message queue API, the application needs to inherit queue_base.py,
especially overriding the function of get_response_and_status_impl
that takes a user defined request proto message to
a user defined response message. If the queue does not need response, please return None as the response.
An example of this implementation of a queue for instant messaging including its consumer (like the RPC implementation) is
import requests
from pslx.micro_service.message_queue.queue_base import QueueBase
from pslx.micro_service.message_queue.generic_consumer import GenericConsumer
from pslx.schema.enums_pb2 import Status
from pslx.schema.rpc_pb2 import InstantMessagingRPCRequest
from pslx.storage.partitioner_storage import DailyPartitionerStorage
from pslx.util.timezone_util import TimezoneUtil
class SlackQueue(QueueBase):
REQUEST_MESSAGE_TYPE = InstantMessagingRPCRequest
def get_response_and_status_impl(self, request):
header = {
'Content-Type': "application/x-www-form-urlencoded",
'Cache-Control': "no-cache",
}
slack_payload = "payload={'text':'" + request.message + "\nCurrent time is "\
+ str(TimezoneUtil.cur_time_in_pst()) + "'}"
status = Status.SUCCEEDED
try:
requests.post(
request.webhook_url,
data=slack_payload,
headers=header
)
except Exception as err:
self._logger.error("Slack failed to send message with err " + str(err))
status = Status.FAILED
return None, status
if __name__ == "__main__":
consumer = GenericConsumer(
connection_str='amqp://guest:guest@localhost:5672'
)
storage = DailyPartitionerStorage()
storage.initialize_from_dir(dir_name="/galaxy/bb-d/pslx/test_data/storage2")
slack_queue = SlackQueue(
queue_name='slack_queue',
queue_storage=storage
)
consumer.bind_queue(queue=slack_queue)
consumer.start_consumer()
To use the message queue, one can also follow the example code for an example of producer:
from pslx.micro_service.message_queue.producer_base import ProducerBase
from pslx.schema.enums_pb2 import InstantMessagingType
from pslx.schema.rpc_pb2 import InstantMessagingRPCRequest
class SlackProducer(ProducerBase):
def send_message(self, channel_name, webhook_url, message):
request = InstantMessagingRPCRequest()
request.is_test = False
request.type = InstantMessagingType.SLACK
request.channel_name = channel_name
request.webhook_url = webhook_url
request.message = message
self.send_request(request=request)
if __name__ == "__main__":
producer = SlackProducer(
queue_name='slack_queue',
connection_str='amqp://guest:guest@localhost:5672'
)
producer.send_message(
channel_name='staging_test',
webhook_url='https://hooks.slack.com/services/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',
message="hello world"
)
One note is that in order to use the message queue, please install rabbitmq.