#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Contact : liuyuqi.gov@msn.cn @Time : 2024/03/25 13:59:44 @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved. @Desc : redis util ''' from datetime import timedelta from redis.asyncio import Redis from typing import Optional class RedisData: key: bytes | str value: bytes |str ttl: Optional[int|timedelta]=None class RedisUtil: def __init(self): self.client=Redis() async def set(self, redis_data: RedisData, *,is_transaction: bool=False)->None: async with self.client.pipeline(transaction=is_transaction) as pipe: await pipe.set(redis_data.key, redis_data.value) if redis_data.ttl: await pipe.expire(redis_data.key, redis_data.ttl) await pipe.execute() async def get(self, key: str) -> Optional[str]: return await self.client.get(key) async def delete(self, key: str): return await self.client.delete(key)