from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q, data):
print('Process to write: %s' % os.getpid())
for value in data:
print('{} :Put {} to queue...'.format(os.getpid(), value))
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('{} :Get {} from queue.'.format(os.getpid(), value))
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw1 = Process(target=write, args=(q,[1,2,3]))
pw2 = Process(target=write, args=(q,[4,5,6]))
pr1 = Process(target=read, args=(q,))
pr2 = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw1.start()
pw2.start()
# 启动子进程pr,读取:
pr1.start()
pr2.start()
# 等待pw结束:
pw1.join()
pw2.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr1.terminate()
pr2.terminate()
输出结果(不唯一):
Process to write: 683947
683947 :Put 1 to queue...
Process to write: 683948
683948 :Put 4 to queue...
Process to read: 683949
683949 :Get 1 from queue.
683949 :Get 4 from queue.
Process to read: 683951
683947 :Put 2 to queue...
683949 :Get 2 from queue.
683948 :Put 5 to queue...
683951 :Get 5 from queue.
683947 :Put 3 to queue...
683949 :Get 3 from queue.
683948 :Put 6 to queue...
683951 :Get 6 from queue.
from multiprocessing import Pool, Queue, Manager
import os, time, random
# 写数据进程执行的代码:
def write(q, data):
print('Process to write: %s' % os.getpid())
for value in data:
print('{} :Put {} to queue...'.format(os.getpid(), value))
q.put(value)
time.sleep(random.random())
return data
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('{} :Get {} from queue.'.format(os.getpid(), value))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(2)
q = Manager().Queue()
for i in range(5):
p.apply_async(write, args=(q,[i]))
for i in range(5):
p.apply_async(read, args=(q,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
p.terminate()
print('All subprocesses done.')
结果是(不唯一):
Parent process 1696274.
Waiting for all subprocesses done...
Process to write: 1696276
1696276 :Put 1 to queue...
Process to write: 1696275
1696275 :Put 0 to queue...
Process to write: 1696276
1696276 :Put 2 to queue...
Process to write: 1696276
1696276 :Put 3 to queue...
Process to write: 1696275
1696275 :Put 4 to queue...
Process to read: 1696275
1696275 :Get 1 from queue.
1696275 :Get 0 from queue.
1696275 :Get 2 from queue.
1696275 :Get 3 from queue.
1696275 :Get 4 from queue.
Process to read: 1696276
from concurrent.futures import ThreadPoolExecutor
import time
def sayhello(a):
print("hello: "+a)
time.sleep(2)
def main():
seed=["a","b","c"]
start1=time.time()
for each in seed:
sayhello(each)
end1=time.time()
print("time1: "+str(end1-start1))
start2=time.time()
with ThreadPoolExecutor(3) as executor:
for each in seed:
executor.submit(sayhello,each)
end2=time.time()
print("time2: "+str(end2-start2))
start3=time.time()
with ThreadPoolExecutor(3) as executor1:
executor1.map(sayhello,seed)
end3=time.time()
print("time3: "+str(end3-start3))
if __name__ == '__main__':
main()
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)