返回列表 发新帖

开源消息队列Java实现_消息队列

[复制链接]

6

主题

18

帖子

18

积分

新手上路

Rank: 1

积分
18
发表于 2024-9-19 19:48:37  | 显示全部楼层 | 阅读模式
基于Java的开源消息队列实现,提供异步通信机制,支持生产者消费者模型。它允许应用程序通过消息传递进行解耦,提高系统的可扩展性和可靠性。
开源消息队列Java实现有很多,其中比较流行的有Apache ActiveMQ、RabbitMQ和Kafka,下面分别介绍这三种消息队列的Java实现:

zbhjiw1fvzu4ims.jpg

zbhjiw1fvzu4ims.jpg


(图片来源网络,侵删)
1、Apache ActiveMQ
Apache ActiveMQ是一个完全支持JMS(Java Message Service)规范的消息代理,它支持多种语言客户端,包括Java,要使用ActiveMQ,首先需要添加相关依赖到项目中,以Maven为例:

    org.apache.activemq
    activemqclient
    5.16.3
创建一个生产者和一个消费者来发送和接收消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQExample {
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Destination destination = session.createQueue("test.queue");
        // 创建生产者
        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
        producer.send(message);
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
        System.out.println("Received message: " + receivedMessage.getText());
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}
2、RabbitMQ
RabbitMQ是一个高性能、高可用的消息队列系统,支持多种协议,要在Java中使用RabbitMQ,需要添加以下依赖:

    com.rabbitmq
    amqpclient
    5.13.0
创建一个生产者和一个消费者来发送和接收消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class RabbitMQExample {
    private final static String QUEUE_NAME = "test_queue";
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接和通道
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 发送消息
            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF8"));
            System.out.println("Sent message: " + message);
            // 接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) > {
                String receivedMessage = new String(delivery.getBody(), "UTF8");
                System.out.println("Received message: " + receivedMessage);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag > {});
        }
    }
}
3、Kafka

zbhji4xp4i51bke.jpg

zbhji4xp4i51bke.jpg


(图片来源网络,侵删)
Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序,要在Java中使用Kafka,需要添加以下依赖:

    org.apache.kafka
    kafkaclients
    2.8.0
创建一个生产者和一个消费者来发送和接收消息:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Arrays;
import java.util.Collections;
public class KafkaExample {
    private final static String TOPIC_NAME = "test_topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    public static void main(String[] args) {
        // 生产者配置
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 消费者配置
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 生产者示例
        try (Producer producer = new KafkaProducer(producerProps)) {
            String message = "Hello, Kafka!";
            producer.send(new ProducerRecord(TOPIC_NAME, message));
            System.out.println("Sent message: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 消费者示例
        try (Consumer consumer = new KafkaConsumer(consumerProps)) {
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord record : records) {
                    System.out.printf("Received message: %s%n", record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
就是三种开源消息队列Java实现的简要介绍,在实际项目中,可以根据需求选择合适的消息队列进行使用。

以下是一个简单的介绍,列出了一些开源的消息队列项目,以及它们对应的Java实现:
消息队列名称 描述 Java实现情况
Apache Kafka 高吞吐量的分布式消息系统,常用于构建实时的数据管道和流式应用程序。 有官方的Java客户端。
RabbitMQ 基于AMQP协议的开源消息代理软件,用于在分布式系统中存储转发消息。 有官方的Java客户端。
Apache ActiveMQ 支持多种协议和数据格式的消息队列。 有官方的Java客户端。
RocketMQ 阿里巴巴开源的消息中间件,用于处理大规模消息的传递。 完全使用Java开发,提供Java客户端。
Pulsar 由Apache软件基金会孵化的分布式发布订阅消息传递系统。 有官方的Java客户端。
Redis 虽然不是传统意义上的消息队列,但可以用作消息队列使用。 有多个Java客户端,如Jedis和Lettuce。
ZeroMQ 一个嵌入式的网络通信库,也可以用于消息队列的场景。 有Java绑定(JeroMQ)。

请注意,这个介绍仅作为一个简单的参考,具体使用时需要根据项目的实际需求和特性进行选择。

zbhj2fdkalut1ye.png

zbhj2fdkalut1ye.png


(图片来源网络,侵删)
回复

使用道具 举报

发表回复

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表