redis工具封装及使用
编程 部署应用 服务器 redis 10

1、docker安装redis

  1. 拉取redis镜像

docker search redis
docker pull docker.io/redis

  1. 创建redis容器

创建 data 、conf、以及redis.conf配置文件,如若不然,无法通过redis deskTop manager访问

mkdir -p /root/redis/data //-p 表示递归创建 如果没有就创建
mkdir -p /root/redis/conf
vim /root/redis/conf/redis.conf //创建redis.conf 配置文件 文件内容如下

给配置文件添加权限:chmod 644 /root/redis/conf/redis.conf

创建容器

docker run --name myredis \
  -v /root/redis/data:/data \
  -v /root/redis/conf/redis.conf:/etc/redis/redis.conf 
  -d -p 6379:6379 redis redis-server /etc/redis/redis.conf
  • --name 给容器起名字

  • -v 将物理机地文件挂载到docker容器内

  • -d 后台运行

  • -p 将物理机端口映射到容器端口上

  • redis 镜像名称 (通过docker pull下来的镜像可通过 docker images 查看)

  • redis-server /etc/redis/redis/conf //启动redis程序并挂载配置文件

进入容器:

docker exec -it myredis redis-cli

2、配置远程连接

开启服务器端的6379端口,防火墙

在另一个redis-cli中远程登录:

redis-cli -h your_server_ip -p 6379

3、使用python封装redis工具

from typing import Optional, Any, Union
import redis
from config.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWD,  DB_ID, MAX_CONNECTIONS, EXPIRE_TIME
from common.logger import logger

class RedisClient:
    def __init__(self, host: str = REDIS_HOST, port: int = REDIS_PORT, 
                 password: str = REDIS_PASSWD, db: int = DB_ID,
                 max_connections: int = MAX_CONNECTIONS):
        """
        初始化 Redis 连接
        
        :param host: Redis 服务器地址
        :param port: Redis 服务器端口
        :param password: Redis 密码
        :param db: 数据库编号
        :param max_connections: 连接池最大连接数
        """
        self.pool = redis.ConnectionPool(
            host=host,
            port=port,
            password=password,
            db=db,
            max_connections=max_connections,
            decode_responses=True
        )
        # self.expire = expire
        self._redis = redis.Redis(connection_pool=self.pool)
    
    @property
    def client(self) -> redis.Redis:
        """获取客户端实例"""
        return self._redis
    
    def set_value(self, key : str, value : Any, expire : int = EXPIRE_TIME) -> bool:
        """
        设置键值对,可设置过期时间
        
        :param key: 键名
        :param value: 值
        :param expire: 过期时间(秒)
        :return: 是否设置成功
        """
        try:
            if expire:
                return self._redis.setex(key, expire, value)
            return self._redis.set(key, value)
        except redis.RedisError as e:
            # raise RedisOperationError(f"Redis 设置键值失败: {e}")
            logger.error(f"Redis 设置键值失败: {e}")

    def get_value(self, key : str) -> Optional[str]:
        """
        获取值
        :param key: 键
        :return: 键值,如果不存在则返回None
        """
        try:
            return self._redis.get(key)
        except redis.RedisError as e:
            logger.error(f"Redis 获取键值失败:{e}")
    
    def delete_key(self, *key: str) -> int:
        """
        删除一个或多个键
        :param key: 要删除的键名
        :return: 返回删除的个数
        """
        try:
            return self._redis.delete(*key)
        except redis.RedisError as e:
            logger.error(f"Redis 删除键失败:{e}")

    def expire_key(self, key: str, expire: int) -> bool:
        """
        设置键的过期时间
        
        :param key: 键名
        :param expire: 过期时间(秒)
        :return: 是否设置成功
        """
        try:
            return self._redis.expire(key, expire)
        except redis.RedisError as e:
            logger.error(f"Redis 设置过期时间失败: {e}")

    def key_exists(self, key: str) -> bool:
        """
        检查键是否存在
        
        :param key: 键名
        :return: 键是否存在
        """
        try:
            return self._redis.exists(key) == 1
        except redis.RedisError as e:
            logger.error(f"Redis 检查键存在失败: {e}")

redis_client = RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWD,  DB_ID, MAX_CONNECTIONS)

4、测试redis工具

def test():
    # 设置键值
    redis_client.set_value("user:token:123", "abc123", expire=3600)

    # 获取值
    token = redis_client.get_value("user:token:123")
    print(token)  # 输出: abc123

    # 检查键是否存在
    if redis_client.key_exists("user:token:123"):
        print("键存在")

    # 删除键
    del_num = redis_client.delete_key("user:token:123")
    print(f"删除了几个:{del_num}")

    # # 使用原生 Redis 客户端(如果需要)
    # r = redis_client.client
    # r.sadd("users", "user1", "user2")

test()

redis工具封装及使用
http://47.92.222.121:8090/archives/hEy9kQ3E
作者
禧语许
发布于
更新于
许可