redis_util.py 1007 B

1234567891011121314151617181920212223242526272829303132333435
  1. #!/usr/bin/env python
  2. # -*- encoding: utf-8 -*-
  3. '''
  4. @Contact : liuyuqi.gov@msn.cn
  5. @Time : 2024/03/25 13:59:44
  6. @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved.
  7. @Desc : redis util
  8. '''
  9. from datetime import timedelta
  10. from redis.asyncio import Redis
  11. from typing import Optional
  12. class RedisData:
  13. key: bytes | str
  14. value: bytes |str
  15. ttl: Optional[int|timedelta]=None
  16. class RedisUtil:
  17. def __init(self):
  18. self.client=Redis()
  19. async def set(self, redis_data: RedisData, *,is_transaction: bool=False)->None:
  20. async with self.client.pipeline(transaction=is_transaction) as pipe:
  21. await pipe.set(redis_data.key, redis_data.value)
  22. if redis_data.ttl:
  23. await pipe.expire(redis_data.key, redis_data.ttl)
  24. await pipe.execute()
  25. async def get(self, key: str) -> Optional[str]:
  26. return await self.client.get(key)
  27. async def delete(self, key: str):
  28. return await self.client.delete(key)