发布网友
共2个回答
懂视网
import select
2 import socket
3 import sys
4 import queue
5
6
7 server = socket.socket()
8 server.setblocking(0)
9
10 server_addr = (‘localhost‘,10000)
11
12 print(‘starting up on %s port %s‘ % server_addr)
13 server.bind(server_addr)
14
15 server.listen(5)
16
17
18 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
19 outputs = []
20
21 message_queues = {}
22
23 while True:
24 print("waiting for next event...")
25
26 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
27
28 for s in readable: #每个s就是一个socket
29
30 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
31 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
32 #新连接进来了,接受这个连接
33 conn, client_addr = s.accept()
34 print("new connection from",client_addr)
35 conn.setblocking(0)
36 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
37 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
38 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
39
40 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
41
42 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
43 #客户端的数据过来了,在这接收
44 data = s.recv(1024)
45 if data:
46 print("收到来自[%s]的数据:" % s.getpeername()[0], data)
47 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
48 if s not in outputs:
49 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
50
51
52 else:#如果收不到data代表什么呢? 代表客户端断开了呀
53 print("客户端断开了",s)
54
55 if s in outputs:
56 outputs.remove(s) #清理已断开的连接
57
58 inputs.remove(s) #清理已断开的连接
59
60 del message_queues[s] ##清理已断开的连接
61
62
63 for s in writeable:
try :
65 next_msg = message_queues[s].get_nowait()
66
67 except queue.Empty:
68 print("client [%s]" %s.getpeername()[0], "queue is empty..")
69 outputs.remove(s)
70
71 else:
72 print("sending msg to [%s]"%s.getpeername()[0], next_msg)
73 s.send(next_msg.upper())
74
75
76 for s in exeptional:
77 print("handling exception for ",s.getpeername())
78 inputs.remove(s)
79 if s in outputs:
80 outputs.remove(s)
81 s.close()
82
83 del message_queues[s]
server
1 import socket 2 import sys 3 4 messages = [ b‘This is the message. ‘, 5 b‘It will be sent ‘, 6 b‘in parts.‘, 7 ] 8 server_address = (‘localhost‘, 10000) 9 10 # Create a TCP/IP socket 11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 12 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 13 ] 14 15 # Connect the socket to the port where the server is listening 16 print(‘connecting to %s port %s‘ % server_address) 17 for s in socks: 18 s.connect(server_address) 19 20 for message in messages: 21 22 # Send messages on both sockets 23 for s in socks: 24 print(‘%s: sending "%s"‘ % (s.getsockname(), message) ) 25 s.send(message) 26 27 # Read responses on both sockets 28 for s in socks: 29 data = s.recv(1024) 30 print( ‘%s: received "%s"‘ % (s.getsockname(), data) ) 31 if not data: 32 print(sys.stderr, ‘closing socket‘, s.getsockname() )client
selectors模块
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print(‘accepted‘, conn, ‘from‘, addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print(‘echoing‘, repr(data), ‘to‘, conn) 16 conn.send(data) # Hope it won‘t block 17 else: 18 print(‘closing‘, conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind((‘localhost‘, 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)selectors
开发堡垒机之前,先来学习Python的paramiko模块,该模块机遇SSH用于连接远程服务器并执行相关操作
SSHClient
用于连接远程服务器并执行基本命令
基于用户名密码连接:
1 import paramiko 2 3 # 创建SSH对象 4 ssh = paramiko.SSHClient() 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, password=‘123‘) 9 10 # 执行命令 11 stdin, stdout, stderr = ssh.exec_command(‘df‘) 12 # 获取命令结果 13 result = stdout.read() 14 15 # 关闭连接 16 ssh.close()
1 import paramiko 2 3 transport = paramiko.Transport((‘hostname‘, 22)) 4 transport.connect(username=‘wupeiqi‘, password=‘123‘) 5 6 ssh = paramiko.SSHClient() 7 ssh._transport = transport 8 9 stdin, stdout, stderr = ssh.exec_command(‘df‘) 10 print stdout.read() 11 12 transport.close()
基于公钥密钥连接:
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 # 创建SSH对象 6 ssh = paramiko.SSHClient() 7 # 允许连接不在know_hosts文件中的主机 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 连接服务器 10 ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, key=private_key) 11 12 # 执行命令 13 stdin, stdout, stderr = ssh.exec_command(‘df‘) 14 # 获取命令结果 15 result = stdout.read() 16 17 # 关闭连接 18 ssh.close()
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 transport = paramiko.Transport((‘hostname‘, 22)) 6 transport.connect(username=‘wupeiqi‘, pkey=private_key) 7 8 ssh = paramiko.SSHClient() 9 ssh._transport = transport 10 11 stdin, stdout, stderr = ssh.exec_command(‘df‘) 12 13 transport.close()SSHClient 封装 Transport
1 import paramiko 2 from io import StringIO 3 4 key_str = """-----BEGIN RSA PRIVATE KEY----- 5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8 6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans 7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e 8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC 9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP 10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A 11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+ 12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh 13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8 14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz 15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6 16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC 17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl 18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq 19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65 20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA 21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM 22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH 23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8 24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg 25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/ 26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj 27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw 28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI 29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0= 30 -----END RSA PRIVATE KEY-----""" 31 32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str)) 33 transport = paramiko.Transport((‘10.0.1.40‘, 22)) 34 transport.connect(username=‘wupeiqi‘, pkey=private_key) 35 36 ssh = paramiko.SSHClient() 37 ssh._transport = transport 38 39 stdin, stdout, stderr = ssh.exec_command(‘df‘) 40 result = stdout.read() 41 42 transport.close() 43 44 print(result)基于私钥字符串进行连接
SFTPClient
用于连接远程服务器并执行上传下载
基于用户名密码上传下载
1 import paramiko 2 3 transport = paramiko.Transport((‘hostname‘,22)) 4 transport.connect(username=‘wupeiqi‘,password=‘123‘) 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 将location.py 上传至服务器 /tmp/test.py 8 sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘) 9 # 将remove_path 下载到本地 local_path 10 sftp.get(‘remove_path‘, ‘local_path‘) 11 12 transport.close()
基于公钥密钥上传下载
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘) 4 5 transport = paramiko.Transport((‘hostname‘, 22)) 6 transport.connect(username=‘wupeiqi‘, pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 将location.py 上传至服务器 /tmp/test.py 10 sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘) 11 # 将remove_path 下载到本地 local_path 12 sftp.get(‘remove_path‘, ‘local_path‘) 13 14 transport.close()
安装 rabbitMA
http://www.cnblogs.com/ericli-ericli/p/5902270.html
http://blog.csdn.net/zyz511919766/article/details/41946521
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host=‘localhost‘)) 4 channel = connection.channel() 5 6 channel.queue_declare(queue=‘hello‘) 7 8 channel.basic_publish(exchange=‘‘, 9 routing_key=‘hello‘, 10 body=‘Hello World!‘) 11 print(" [x] Sent ‘Hello World!‘") 12 connection.close()生产者
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 host=‘localhost‘)) 4 channel = connection.channel() 5 6 channel.queue_declare(queue=‘hello‘) 7 8 def callback(ch, method, properties, body): 9 print(" [x] Received %r" % body) 10 11 channel.basic_consume(callback, 12 queue=‘hello‘, 13 no_ack=True) 14 15 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 16 channel.start_consuming()消费者
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘10.211.55.4‘)) 5 channel = connection.channel() 6 7 channel.queue_declare(queue=‘hello‘) 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print ‘ok‘ 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 16 channel.basic_consume(callback, 17 queue=‘hello‘, 18 no_ack=False) 19 20 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 21 channel.start_consuming()消费者
2、durable 消息不丢失
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘, durable=True) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent ‘Hello World!‘") connection.close()生产者
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue=‘hello‘, durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print ‘ok‘ 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue=‘hello‘, 19 no_ack=False) 20 21 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 22 channel.start_consuming()消费者
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,但是在消费者端,配置prefetch_count
=
1
,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.211.55.4‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ‘ok‘ ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()消费者
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=‘localhost‘)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=‘logs‘, 9 type=‘fanout‘) 10 11 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange=‘logs‘, 13 routing_key=‘‘, 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()发布者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘, queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()订阅者
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=‘localhost‘)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=‘direct_logs‘, 9 type=‘direct‘) 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 17 sys.exit(1
热心网友
前言:本文主要讲解LinuxIO调度层的三种模式:cfp、deadline和noop,并给出各自的优化和适用场景建议。IO调度发生在Linux内核的IO调度层。这个层次是针对Linux的整体IO层次体系来说的。从read()或者write()系统调用的角度来说,Linux整体IO体系可以分为七层,它们分别是:VFS层:虚拟文件系统层。由于内核要跟多种文件系统打交道,而每一种文件系统所实现的数据结构和相关方法都可能不尽相同,所以,内核抽象了这一层,专门用来适配各种文件系统,并对外提供统一操作接口。文件系统层:不同的文件系统实现自己的操作过程,提供自己特有的特征,具体不多说了,大家愿意的话自己去看代码即可。页缓存层:负责真对page的缓存。通用块层:由于绝大多数情况的io操作是跟块设备打交道,所以Linux在此提供了一个类似vfs层的块设备操作抽象层。下层对接各种不同属性的块设备,对上提供统一的BlockIO请求标准。IO调度层:因为绝大多数的块设备都是类似磁盘这样的设备,所以有必要根据这类设备的特点以及应用的不同特点来设置一些不同的调度算法和队列。以便在不同的应用环境下有针对性的提高磁盘的读写效率,这里就是大名鼎鼎的Linux电梯所起作用的地方。针对机械硬盘的各种调度方法就是在这实现的。块设备驱动层:驱动层对外提供相对比较高级的设备操作接口,往往是C语言的,而下层对接设备本身的操作方法和规范。块设备层:这层就是具体的物理设备了,定义了各种真对设备操作方法和规范。有一个已经整理好的[LinuxIO结构图],非常经典,一图胜千言:我们今天要研究的内容主要在IO调度这一层。它要解决的核心问题是,如何提高块设备IO的整体性能?这一层也主要是针对机械硬盘结构而设计的。众所周知,机械硬盘的存储介质是磁盘,磁头在盘片上移动进行磁道寻址,行为类似播放一张唱片。这种结构的特点是,顺序访问时吞吐量较高,但是如果一旦对盘片有随机访问,那么大量的时间都会浪费在磁头的移动上,这时候就会导致每次IO的响应时间变长,极大的降低IO的响应速度。磁头在盘片上寻道的操作,类似电梯调度,实际上在最开始的时期,Linux把这个算法命名为Linux电梯算法,即:如果在寻道的过程中,能把顺序路过的相关磁道的数据请求都“顺便”处理掉,那么就可以在比较小影响响应速度的前提下,提高整体IO的吞吐量。这就是我们为什么要设计IO调度算法的原因。目前在内核中默认开启了三种算法/模式:noop,cfq和deadline。严格算应该是两种:因为第一种叫做noop,就是空操作调度算法,也就是没有任何调度操作,并不对io请求进行排序,仅仅做适当的io合并的一个fifo队列。目前内核中默认的调度算法应该是cfq,叫做完全公平队列调度。这个调度算法人如其名,它试图给所有进程提供一个完全公平的IO操作环境。注:请大家一定记住这个词语,cfq,完全公平队列调度,不然下文就没法看了。cfq为每个进程创建一个同步IO调度队列,并默认以时间片和请求数限定的方式分配IO资源,以此保证每个进程的IO资源占用是公平的,cfq还实现了针对进程级别的优先级调度,这个我们后面会详细解释。查看和修改IO调度算法的方法是:cfq是通用服务器比较好的IO调度算法选择,对桌面用户也是比较好的选择。但是对于很多IO压力较大的场景就并不是很适应,尤其是IO压力集中在某些进程上的场景。因为这种场景我们需要的满足某个或者某几个进程的IO响应速度,而不是让所有的进程公平的使用IO,比如数据库应用。deadline调度(最终期限调度)就是更适合上述场景的解决方案。deadline实现了四个队列:其中两个分别处理正常read和write,按扇区号排序,进行正常io的合并处理以提高吞吐量。因为IO请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置的io请求被饿死。另外两个处理超时read和write的队列,按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的请求会优先被处理,防止请求被饿死。不久前,内核还是默认标配四种算法,还有一种叫做as的算法(Anticipatoryscheler),预测调度算法。一个高大上的名字,搞得我一度认为Linux内核都会算命了。结果发现,无非是在基于deadline算法做io调度的之前等一小会时间,如果这段时间内有可以合并的io请求到来,就可以合并处理,提高deadline调度的在顺序读写情况下的数据吞吐量。其实这根本不是啥预测,我觉得不如叫撞大运调度算法,当然这种策略在某些特定场景差效果不错。但是在大多数场景下,这个调度不仅没有提高吞吐量,还降低了响应速度,所以内核干脆把它从默认配置里删除了。毕竟Linux的宗旨是实用,而我们也就不再这个调度算法上多费口舌了。1、cfq:完全公平队列调度cfq是内核默认选择的IO调度队列,它在桌面应用场景以及大多数常见应用场景下都是很好的选择。如何实现一个所谓的完全公平队列(CompletelyFairQueueing)?首先我们要理解所谓的公平是对谁的公平?从操作系统的角度来说,产生操作行为的主体都是进程,所以这里的公平是针对每个进程而言的,我们要试图让进程可以公平的占用IO资源。那么如何让进程公平的占用IO资源?我们需要先理解什么是IO资源。当我们衡量一个IO资源的时候,一般喜欢用的是两个单位,一个是数据读写的带宽,另一个是数据读写的IOPS。带宽就是以时间为单位的读写数据量,比如,100Mbyte/s。而IOPS是以时间为单位的读写次数。在不同的读写情境下,这两个单位的表现可能不一样,但是可以确定的是,两个单位的任何一个达到了性能上限,都会成为IO的瓶颈。从机械硬盘的结构考虑,如果读写是顺序读写,那么IO的表现是可以通过比较少的IOPS达到较大的带宽,因为可以合并很多IO,也可以通过预读等方式加速数据读取效率。当IO的表现是偏向于随机读写的时候,那么IOPS就会变得更大,IO的请求的合并可能性下降,当每次io请求数据越少的时候,带宽表现就会越低。从这里我们可以理解,针对进程的IO资源的主要表现形式有两个:进程在单位时间内提交的IO请求个数和进程占用IO的带宽。其实无论哪个,都是跟进程分配的IO处理时间长度紧密相关的。有时业务可以在较少IOPS的情况下占用较大带宽,另外一些则可能在较大IOPS的情况下占用较少带宽,所以对进程占用IO的时间进行调度才是相对最公平的。即,我不管你是IOPS高还是带宽占用高,到了时间咱就换下一个进程处理,你爱咋样咋样。所以,cfq就是试图给所有进程分配等同的块设备使用的时间片,进程在时间片内,可以将产生的IO请求提交给块设备进行处理,时间片结束,进程的请求将排进它自己的队列,等待下次调度的时候进行处理。这就是cfq的基本原理。当然,现实生活中不可能有真正的“公平”,常见的应用场景下,我们很肯能需要人为的对进程的IO占用进行人为指定优先级,这就像对进程的CPU占用设置优先级的概念一样。所以,除了针对时间片进行公平队列调度外,cfq还提供了优先级支持。每个进程都可以设置一个IO优先级,cfq会根据这个优先级的设置情况作为调度时的重要参考因素。优先级首先分成三大类:RT、BE、IDLE,它们分别是实时(RealTime)、最佳效果(BestTry)和闲置(Idle)三个类别,对每个类别的IO,cfq都使用不同的策略进行处理。另外,RT和BE类别中,分别又再划分了8个子优先级实现更细节的QOS需求,而IDLE只有一个子优先级。另外,我们都知道内核默认对存储的读写都是经过缓存(buffer/cache)的,在这种情况下,cfq是无法区分当前处理的请求是来自哪一个进程的。只有在进程使用同步方式(syncread或者syncwirte)或者直接IO(DirectIO)方式进行读写的时候,cfq才能区分出IO请求来自哪个进程。所以,除了针对每个进程实现的IO队列以外,还实现了一个公共的队列用来处理异步请求。当前内核已经实现了针对IO资源的cgroup资源隔离,所以在以上体系的基础上,cfq也实现了针对cgroup的调度支持。总的来说,cfq用了一系列的数据结构实现了以上所有复杂功能的支持,大家可以通过源代码看到其相关实现,文件在源代码目录下的block/cfq-iosched.c。1.1cfq设计原理在此,我们对整体数据结构做一个简要描述:首先,cfq通过一个叫做cfq_data的数据结构维护了整个调度器流程。在一个支持了cgroup功能的cfq中,全部进程被分成了若干个contralgroup进行管理。每个cgroup在cfq中都有一个cfq_group的结构进行描述,所有的cgroup都被作为一个调度对象放进一个红黑树中,并以vdisktime为key进行排序。vdisktime这个时间纪录的是当前cgroup所占用的io时间,每次对cgroup进行调度时,总是通过红黑树选择当前vdisktime时间最少的cgroup进行处理,以保证所有cgroups之间的IO资源占用“公平”。当然我们知道,cgroup是可以对blkio进行资源比例分配的,其作用原理就是,分配比例大的cgroup占用vdisktime时间增长较慢,分配比例小的vdisktime时间增长较快,快慢与分配比例成正比。这样就做到了不同的cgroup分配的IO比例不一样,并且在cfq的角度看来依然是“公平“的。选择好了需要处理的cgroup(cfq_group)之后,调度器需要决策选择下一步的service_tree。service_tree这个数据结构对应的都是一系列的红黑树,主要目的是用来实现请求优先级分类的,就是RT、BE、IDLE的分类。每一个cfq_group都维护了7个service_trees,其定义如下:其中service_tree_idle就是用来给IDLE类型的请求进行排队用的红黑树。而上面二维数组,首先第一个维度针对RT和BE分别各实现了一个数组,每一个数组中都维护了三个红黑树,分别对应三种不同子类型的请求,分别是:SYNC、SYNC_NOIDLE以及ASYNC。我们可以认为SYNC相当于SYNC_IDLE并与SYNC_NOIDLE对应。idling是cfq在设计上为了尽量合并连续的IO请求以达到提高吞吐量的目的而加入的机制,我们可以理解为是一种“空转”等待机制。空转是指,当一个队列处理一个请求结束后,会在发生调度之前空等一小会时间,如果下一个请求到来,则可以减少磁头寻址,继续处理顺序的IO请求。为了实现这个功能,cfq在service_tree这层数据结构这实现了SYNC队列,如果请求是同步顺序请求,就入队这个servicetree,如果请求是同步随机请求,则入队SYNC_NOIDLE队列,以判断下一个请求是否是顺序请求。所有的异步写操作请求将入队ASYNC的servicetree,并且针对这个队列没有空转等待机制。此外,cfq还对SSD这样的硬盘有特殊调整,当cfq发现存储设备是一个ssd硬盘这样的队列深度更大的设备时,所有针对单独队列的空转都将不生效,所有的IO请求都将入队SYNC_NOIDLE这个servicetree。每一个servicetree都对应了若干个cfq_queue队列,每个cfq_queue队列对应一个进程,这个我们后续再详细说明。cfq_group还维护了一个在cgroup内部所有进程公用的异步IO请求队列,其结构如下:异步请求也分成了RT、BE、IDLE这三类进行处理,每一类对应一个cfq_queue进行排队。BE和RT也实现了优先级的支持,每一个类型有IOPRIO_BE_NR这么多个优先级,这个值定义为8,数组下标为0-7。我们目前分析的内核代码版本为Linux4.4,可以看出,从cfq的角度来说,已经可以实现异步IO的cgroup支持了,我们需要定义一下这里所谓异步IO的含义,它仅仅表示从内存的buffer/cache中的数据同步到硬盘的IO请求,而不是aio(man7aio)或者linux的native异步io以及lio机制,实际上这些所谓的“异步”IO机制,在内核中都是同步实现的(本质上冯诺伊曼计算机没有真正的“异步”机制)。我们在上面已经说明过,由于进程正常情况下都是将数据先写入buffer/cache,所以这种异步IO都是统一由cfq_group中的async请求队列处理的。那么为什么在上面的service_tree中还要实现和一个ASYNC的类型呢?这当然是为了支持区分进程的异步IO并使之可以“完全公平”做准备喽。实际上在最新的cgroupv2的blkio体系中,内核已经支持了针对bufferIO的cgroup限速支持,而以上这些可能容易混淆的一堆类型,都是在新的体系下需要用到的类型标记。新体系的复杂度更高了,功能也更加强大,但是大家先不要着急,正式的cgroupv2体系,在Linux4.5发布的时候会正式跟大家见面。我们继续选择service_tree的过程,三种优先级类型的service_tree的选择就是根据类型的优先级来做选择的,RT优先级最高,BE其次,IDLE最低。就是说,RT里有,就会一直处理RT,RT没了再处理BE。每个service_tree对应一个元素为cfq_queue排队的红黑树,而每个cfq_queue就是内核为进程(线程)创建的请求队列。每一个cfq_queue都会维护一个rb_key的变量,这个变量实际上就是这个队列的IO服务时间(servicetime)。这里还是通过红黑树找到servicetime时间最短的那个cfq_queue进行服务,以保证“完全公平”。选择好了cfq_queue之后,就要开始处理这个队列里的IO请求了。这里的调度方式基本跟deadline类似。cfq_queue会对进入队列的每一个请求进行两次入队,一个放进fifo中,另一个放进按访问扇区顺序作为key的红黑树中。默认从红黑树中取请求进行处理,当请求的延时时间达到deadline时,就从红黑树中取等待时间最长的进行处理,以保证请求不被饿死。这就是整个cfq的调度流程,当然其中还有很多细枝末节没有交代,比如合并处理以及顺序处理等等。1.2cfq的参数调整理解整个调度流程有助于我们决策如何调整cfq的相关参数。所有cfq的可调参数都可以在/sys/class/block/sda/queue/iosched/目录下找到,当然,在你的系统上,请将sda替换为相应的磁盘名称。我们来看一下都有什么:这些参数部分是跟机械硬盘磁头寻道方式有关的,如果其说明你看不懂,请先补充相关知识:back_seek_max:磁头可以向后寻址的最大范围,默认值为16M。back_seek_penalty:向后寻址的惩罚系数。这个值是跟向前寻址进行比较的。以上两个是为了防止磁头寻道发生抖动而导致寻址过慢而设置的。基本思路是这样,一个io请求到来的时候,cfq会根据其寻址位置预估一下其磁头寻道成本。设置一个最大值back_seek_max,对于请求所访问的扇区号在磁头后方的请求,只要寻址范围没有超过这个值,cfq会像向前寻址的请求一样处理它。再设置一个评估成本的系数back_seek_penalty,相对于磁头向前寻址,向后寻址的距离为1/2(1/back_seek_penalty)时,cfq认为这两个请求寻址的代价是相同。这两个参数实际上是cfq判断请求合并处理的条件*,凡事复合这个条件的请求,都会尽量在本次请求处理的时候一起合并处理。fifo_expire_async:设置异步请求的超时时间。同步请求和异步请求是区分不同队列处理的,cfq在调度的时候一般情况都会优先处理同步请求,之后再处理异步请求,除非异步请求符合上述合并处理的条件*范围内。当本进程的队列被调度时,cfq会优先检查是否有异步请求超时,就是超过fifo_expire_async参数的*。如果有,则优先发送一个超时的请求,其余请求仍然按照优先级以及扇区编号大小来处理。fifo_expire_sync:这个参数跟上面的类似,区别是用来设置同步请求的超时时间。slice_idle:参数设置了一个等待时间。这让cfq在切换cfq_queue或servicetree的时候等待一段时间,目的是提高机械硬盘的吞吐量。一般情况下,来自同一个cfq_queue或者servicetree的IO请求的寻址局部性更好,所以这样可以减少磁盘的寻址次数。这个值在机械硬盘上默认为非零。当然在固态硬盘或者硬RAID设备上设置这个值为非零会降低存储的效率,因为固态硬盘没有磁头寻址这个概念,所以在这样的设备上应该设置为0,关闭此功能。group_idle:这个参数也跟上一个参数类似,区别是当cfq要切换cfq_group的时候会等待一段时间。在cgroup的场景下,如果我们沿用slice_idle的方式,那么空转等待可能会在cgroup组内每个进程的cfq_queue切换时发生。这样会如果这个进程一直有请求要处理的话,那么直到这个cgroup的配额被耗尽,同组中的其它进程也可能无法被调度到。这样会导致同组中的其它进程饿死而产生IO性能瓶颈。在这种情况下,我们可以将slice_idle=0而group_idle=8。这样空转等待就是以cgroup为单位进行的,而不是以cfq_queue的进程为单位进行,以防止上述问题产生。low_latency:这个是用来开启或关闭cfq的低延时(lowlatency)模式的开关。当这个开关打开时,cfq将会根据target_latency的参数设置来对每一个进程的分片时间(slicetime)进行重新计算。这将有利于对吞吐量的公平(默认是对时间片分配的公平)。关闭这个参数(设置为0)将忽略target_latency的值。这将使系统中的进程完全按照时间片方式进行IO资源分配。这个开关默认是打开的。我们已经知道cfq设计上有“空转”(idling)这个概念,目的是为了可以让连续的读写操作尽可能多的合并处理,减少磁头的寻址操作以便增大吞吐量。如果有进程总是很快的进行顺序读写,那么它将因为cfq的空转等待命中率很高而导致其它需要处理IO的进程响应速度下降,如果另一个需要调度的进程不会发出大量顺序IO行为的话,系统中不同进程IO吞吐量的表现就会很不均衡。就比如,系统内存的cache中有很多脏页要写回时,桌面又要打开一个浏览器进行操作,这时脏页写回的后台行为就很可能会大量命中空转时间,而导致浏览器的小量IO一直等待,让用户感觉浏览器运行响应速度变慢。这个low_latency主要是对这种情况进行优化的选项,当其打开时,系统会根据target_latency的配置对因为命中空转而大量占用IO吞吐量的进程进行*,以达到不同进程IO占用的吞吐量的相对均衡。这个开关比较合适在类似桌面应用的场景下打开。target_latency:当low_latency的值为开启状态时,cfq将根据这个值重新计算每个进程分配的IO时间片长度。quantum:这个参数用来设置每次从cfq_queue中处理多少个IO请求。在一个队列处理事件周期中,超过这个数字的IO请求将不会被处理。这个参数只对同步的请求有效。slice_sync:当一个cfq_queue队列被调度处理时,它可以被分配的处理总时间是通过这个值来作为一个计算参数指定的。公式为:time_slice=slice_sync+(slice_sync/5*(4-prio))。这个参数对同步请求有效。slice_async:这个值跟上一个类似,区别是对异步请求有效。slice_async_rq:这个参数用来*在一个slice的时间范围内,一个队列最多可以处理的异步请求个数。请求被处理的最大个数还跟相关进程被设置的io优先级有关。1.3cfq的IOPS模式我们已经知道,默认情况下cfq是以时间片方式支持的带优先级的调度来保证IO资源占用的公平。高优先级的进程将得到的时间片长度,而低优先级的进程时间片相对较小。当我们的存储是一个高速并且支持NCQ(原生指令队列)的设备的时候,我们最好可以让其可以从多个cfq队列中处理多路的请求,以便提升NCQ的利用率。此时使用时间片的分配方式分配资源就显得不合时宜了,因为基于时间片的分配,同一时刻最多能处理的请求队列只有一个。这时,我们需要切换cfq的模式为IOPS模式。切换方式很简单,就是将slice_idle=0即可。内核会自动检测你的存储设备是否支持NCQ,如果支持的话cfq会自动切换为IOPS模式。另外,在默认的基于优先级的时间片方式下,我们可以使用ionice命令来调整进程的IO优先级。进程默认分配的IO优先级是根据进程的nice值计算而来的,计算方法可以在manionice中看到,这里不再废话。2、deadline:最终期限调度deadline调度算法相对cfq要简单很多。其设计目标是:在保证请求按照设备扇区的顺序进行访问的同时,兼顾其它请求不被饿死,要在一个最终期限前被调度到。我们知道磁头对磁盘的寻道是可以进行顺序访问和随机访问的,因为寻道延时时间的关系,顺序访问时IO的吞吐量更大,随机访问的吞吐量小。如果我们想为一个机械硬盘进行吞吐量优化的话,那么就可以让调度器按照尽量复合顺序访问的IO请求进行排序,之后请求以这样的顺序发送给硬盘,就可以使IO的吞吐量更大。但是这样做也有另一个问题,就是如果此时出现了一个请求,它要访问的磁道离目前磁头所在磁道很远,应用的请求又大量集中在目前磁道附近。导致大量请求一直会被合并和插队处理,而那个要访问比较远磁道的请求将因为一直不能被调度而饿死。deadline就是这样一种调度器,能在保证IO最大吞吐量的情况下,尽量使远端请求在一个期限内被调度而不被饿死的调度器。