欢迎投稿

今日深度:

redis消息模式,redis消息

redis消息模式,redis消息


消息通知

  一般来说,消息队列有两种场景,一种是生产者消费者模式,一种是发布者订阅者模式。利用redis这两种场景的消息队列都能实现。

  1、生产者消费者模式

  生产者生产消息放到队列中,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息,即对于每个消息最多只能被一个消费者拥有。

  具体的方法就是创建一个任务队列,生产者主动lpush消息,而消费者去rpop数据。但是这样存在一个问题,就是消费者需要主动去请求数据,周期性的请求会造成资源的浪费。如果可以实现一旦有新消息加入队列就通知消费者就好了,这时借助brpop命令就可以实现这样的需求。brpop和rpop命令相似,唯一区别就是当列表中没有元素时,brpop命令会一直阻塞住连接,直到有新元素加入。

BRPOP key timeout

  brpop命令接收两个参数,第一个参数key为键值,第二个参数timeout为超时时间。BRPOP命令取数据时候,如果暂时不存在数据,该命令会一直阻塞直到达到超时时间。如果timeout设置为0,那么就会无限等待下去。

  

  2、发布者订阅者模式

  发布者生产消息放到队列里,多个监听队列的订阅者都会受到同一份消息。

  生产者使用下面命令来发布消息:

PUBLISH CHANNEL MESSAGE

  订阅者通过下面的命令来订阅消息,执行subscribe命令后,客户端进入订阅状态,处于此状态的客户端不能使用4个属于“发布/订阅”模型的命令之外的命令。另外,可以使用subscribe channel1.1 channel1.2 ... 同时订阅多个频道。

SUBSCRIBE CHANNEL

  

  3、Java实现的redis的消息队列

  在jedis中,有对应的方法进行订阅和发布,为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

  下面我们要实现三个类,一个对应publish,一个对应subscribe,一个对应要传递的对象实体类:

  实体类:

import java.io.Serializable;
/**
 * 实体类
 * 封装消息
 * @author Administrator
 *
 */
public class Message implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String title;
    private String content;
    public String getTitle()
    {
        return title;
    }
    public void setTitle(String title)
    {
        this.title = title;
    }
    public String getContent()
    {
        return content;
    }
    public void setContent(String content)
    {
        this.content = content;
    }
}

  Publish类:

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import redis.clients.jedis.Jedis;
/**
 * 发布者,用于发布消息
 * @author Administrator
 *
 */
public class TestPub
{
    public static void main(String[] args)
    {
        Jedis jedis = new Jedis("127.0.0.1");
        try
        {
            Message message = new Message();
            message.setTitle("体育新闻");
            message.setContent("著名NBA球星科比退役了!");
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(message);
            String msg1 = baos.toString("ISO-8859-1");

            jedis.publish("foo", msg1);
        } catch (Exception e)
        {
            e.printStackTrace();
        }
        jedis.close();
    }
}

  Subscribe类:

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
 * 订阅者,用于接收消息
 * @author Administrator
 *
 */
public class TestSub
{
    public static void main(String[] args)
    {
        Jedis jedis = new Jedis("127.0.0.1");
        JedisPubSub jedisPubSub = new JedisPubSub() 
        {
            @Override
			public void onUnsubscribe(String channel, int subscribedChannels) {
				 // 取消订阅时候的处理  
				System.out.println("1");
			}


			@Override
			public void onSubscribe(String channel, int subscribedChannels) {
				// 初始化订阅时候的处理  
				System.out.println("2");
			}


			@Override
			public void onPUnsubscribe(String pattern, int subscribedChannels) {
				 // // 取消按表达式的方式订阅时候的处理  
				System.out.println("3");
			}
			
			@Override
			public void onPSubscribe(String pattern, int subscribedChannels) {
				 //   // 初始化按表达式的方式订阅时候的处理  
				System.out.println("4");
			}


			@Override
			public void onPMessage(String pattern, String channel,
					String message) {
				//// 取得按表达式的方式订阅的消息后的处理  
				System.out.println("5");
			}

            @Override
            public void onMessage(String channel, String message)
            {// 取得订阅的消息后的处理  
                try
                {
                    ByteArrayInputStream bis = new ByteArrayInputStream(message.getBytes("ISO-8859-1"));
            // 此处指定字符集将字符串编码成字节数组,此处的字符集需要与发布时的字符集保持一致 ObjectInputStream ois = new ObjectInputStream(bis); Message message2= (Message) ois.readObject(); System.out.println(message2.getTitle()+"\n"+message2.getContent()); } catch (Exception e) { e.printStackTrace(); } finally { } } }; jedis.subscribe(jedisPubSub, "foo"); jedis.close(); } }

www.htsjk.Com true http://www.htsjk.com/redis/35607.html NewsArticle redis消息模式,redis消息 消息通知 一般来说,消息队列有两种场景,一种是生产者消费者模式,一种是发布者订阅者模式。利用redis这两种场景的消息队列都能实现。 1、生产者消费者模...
相关文章
    暂无相关文章
评论暂时关闭