포스트

RabbitMQ Health Check

목차

  1. RabbitMQ Health Check

RabbitMQ Health Check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pika

def rabbitmq_health_check():
  try:
    creds = pika.PlainCredentials(username=RABBITMQ_USER, password=RABBITMQ_PASSWORD)
    params = pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=creds)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()

    if connection.is_open and channel.is_open:
      channel.close()
      connection.close()
      print('Success to connect to RabbitMQ Connection & Channel')
    elif not connection.is_open:
      print('Success to connect to RabbitMQ Connection')
    elif not channel.is_open:
      connection.close()
      print('Success to connect to RabbitMQ Channel')

  except pika.exceptions.AMQPConnectionError as e:
    print(f"Failed to connect to RabbitMQ : {e}")