封装的Redis队列

MyRedisQueue.py

#!usr/bin/env python2.7
# -*- coding: utf-8 -*-

import redis


class RedisQueue(object):
    def __init__(self, name, namespace='queue', **redis_kwargs):
        # redis的默认参数为:host='localhost', port=6379, db=0, 其中db为定义redis database的数量
        # r = redis.Redis(host='localhost', port=6379, decode_responses=True)
        # 加上decode_responses=True,写入的键值对中的value为str类型,不加这个参数写入的则为字节类型
        # host是redis主机,需要redis服务端和客户端都启动 redis默认端口是6379
        self.__db = redis.Redis(**redis_kwargs)
        self.key = '%s:%s' % (namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回队列里面list内元素的数量

    def put(self, item, timeout=None):
        self.__db.rpush(self.key, item)  # 添加新元素到队列最右方
        if isinstance(timeout, int):
            self.__db.expire(self.key, timeout)

    def get_wait(self, timeout=None):
        # 返回队列第一个元素,如果为空则等待至有元素被加入队列(超时时间阈值为timeout,如果为None则一直等待)
        item = self.__db.blpop(self.key, timeout=timeout)
        # if item:
        #     item = item[1]  # 返回值为一个tuple
        return item

    def get_nowait(self):
        # 直接返回队列第一个元素,如果队列为空返回的是None
        item = self.__db.lpop(self.key)
        return item

    def get_all(self):
        items = []
        while True:
            result = self.get_nowait()
            if result:
                items.append(eval(result))
            else:
                break
        return items

接收端

#!usr/bin/env python2.7
# -*- coding: utf-8 -*-

from MyRedisQueue import RedisQueue

queue_name = "q1"
retCode = {"status": {"code": 0, "msg": "success"}}
redis_queue = RedisQueue(queue_name)
ret = redis_queue.get_wait(30)  # 阻塞等待30s,直到队列中有元素进来
if ret is None:
    retCode["status"]["code"] = 2
    retCode["status"]["msg"] = "超时未响应"

发送端

#!usr/bin/env python2.7
# -*- coding: utf-8 -*-

from MyRedisQueue import RedisQueue

queue_name = "q1"
redis_queue = RedisQueue(queue_name)
redis_queue.put("all done")
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄