今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool这个类之后,可以使用如下方法
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
class StrictRedis(object): ........ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): if not connection_pool: .......... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) #调用ConnectionPool.get_connection方法获取一个连接 try: connection.send_command(*args) #命令执行,这里为Connection.send_command return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection) #调用ConnectionPool.release释放连接
在来看看ConnectionPool类:
class ConnectionPool(object): ........... def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): #类初始化时调用构造函数 max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: #判断输入的max_connections是否合法 raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class #设置对应的参数 self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() #初始化ConnectionPool 时的reset操作 def reset(self): self.pid = os.getpid() self._created_connections = 0 #已经创建的连接的计数器 self._available_connections = [] #声明一个空的数组,用来存放可用的连接 self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的连接 self._check_lock = threading.Lock() ....... def get_connection(self, command_name, *keys, **options): #在连接池中获取连接的方法 "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组, 会直接调用make_connection方法 except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) #向代表正在使用的连接的集合中添加元素 return connection def make_connection(self): #在_available_connections数组为空时获取连接调用的方法 "Create a new connection" if self._created_connections >= self.max_connections: #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化 raise ConnectionError("Too many connections") self._created_connections += 1 #把代表已经创建的连接的数值+1 return self.connection_class(**self.connection_kwargs) #返回有效的连接,默认为Connection(**self.connection_kwargs) def release(self, connection): #释放连接,链接并没有断开,只是存在链接池中 "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) #从集合中删除元素 self._available_connections.append(connection) #并添加到_available_connections 的数组中 def disconnect(self): #断开所有连接池中的链接 "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
class Connection(object): "Manages TCP communication to and from a Redis server" def __del__(self): #对象删除时的操作,调用disconnect释放连接 try: self.disconnect() except Exception: pass
核心的链接建立方法是通过socket模块实现:
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: #构造函数中默认 socket_keepalive=False,因此这里默认为短连接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) #构造函数中默认socket_timeout=None return sock except socket.error as _: err = _ if sock is not None: sock.close() .....
关闭链接的方法:
def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self._sock.close() except socket.error: pass self._sock = None
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。
本文出自 "菜光光的博客" 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1583541
【需求说明】
对Windows Server 2008 R2系统状态进行备份,且需要保留三天的系统状态备份数据,但是我们会发现Windows Server Backup并不能够创建多个备份计划,不能设置备份副本的保存数量等,这时候怎么处理了?
【解决方法】
如果大家用过Windows Server Backup,那么大家一定知道在Windows Server Backup的界面上,只能建一个备份计划(我也常识找过但未找到可以配置多个备份计划的地方),而且备份选项中只有每天一次和每天多次的备份选项,根本没有几天一次的选项,而Windows Server Backup对于其备份副本的保存有两种机制:在本地目录或直连磁盘上,它会保留每一个备份副本,直到备份空间占满才会删除第一个备份副本来储存下一个新的备份副本,而且每一个备份副本之间是叠加关系,不能单独分离出来使用;而在远程目录上,它只会保留唯一一个最新的备份副本,每次均为覆盖备份。
其实只要好好研究一下,这个功能是可以实现的,只是操作相对复杂一点,通过备份计划我们可以看到它实际是通过计划任务的方式,还进行调度备份任务的,如果你仔细去查看会发现计划任务里面会有它的跟踪,所以我们可以通过计划任务来进行处理此需求。
【实验步骤】
如果你是存储在共享磁盘中可以采用以下方式操作
1、在共享目录中新建三个目录(满足保留三天备份的需求);
2、在Windows Server Backup中创建备份计划
3、点击"下一步"
4、选择"自定义"点击"下一步"
5、"添加项"选择"系统状态",点击"下一步"
6、选择"每一次"点击"下一步"
7、备份到共享网络文件夹,点击"下一步"
8、这里我们选择第一个目录,即backup01,点击"下一步"
9、配置具有执行任务计划的用户名密码,点击"确定"
10、点击完成
11、这时候我们在任务计划库里面就可以看到对应的任务计划
12、然后我们双击对应的计划任务,点击"编辑触发器",这里我们设置计划任务时间为每天,然后每隔两天备份一次;
13、然后我们导出备份配置,然后删除对应的任务计划;
14、这时候我们再在Windows Server Backup中新建备份计划,这次我们设置备份位置为backup02目录;
15、同样的我们修改对应的备份计划任务的备份时间为每天,每隔两天备份一次,这里有一个地方需要注意,就是开始计划任务时间,因为我们前面一个已经设置了2014年11月25日,所以我们这个计划任务要设置后一天即2014年11月26日,这样子才能够实现三个备份日期顺序性;
16、同样的我们再导出计划任务保存起来,以备后面所需;
17、然后我们再在Windows Server Backup中设置第三个备份计划,这此设置位置放在backup03目录中;
18、同样的我们这里需要设置它的计划任务时间为每隔两天备份一次,但开始时间要在2014年11月26日的时间上再加一天,即2014年11月27日;
19、这里要注意不要再删除现在的备份任务计划了,不然后面Windows Server Backup备份计划会不成功的,这时候我们点击"导入任务",
没有评论:
发表评论