Python堆,Python之队列和数据库

 2023-09-24 阅读 16 评论 0

摘要:一 数据库 Python堆?Python可以和数据库进行交互,这里也有一些交互的模块,比如MySQLdb、pymysql等,但是3.x 已经不再支持MySQLdb,所以你安装的时候会报错 python 优先队列?我们以pymysql为例子: 1.1 创建连接 pymysql有几种创建连接的方

一 数据库

Python堆?Python可以和数据库进行交互,这里也有一些交互的模块,比如MySQLdb、pymysql等,但是3.x 已经不再支持MySQLdb,所以你安装的时候会报错

 

python 优先队列?我们以pymysql为例子:

1.1 创建连接

pymysql有几种创建连接的方式

import pymysql
conn = pymysql.Connect(host="localhost",port=3306,user="root",password="123456",database="employee",charset
='utf8')
conn1 = pymysql.connect(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8')
conn2 = pymysql.Connection(host="localhost",port=3306,user="root",password="123456",database="ofbiz",charset=
'utf8')

其实connect==Connection=Connect

1.2 简单的例子

# 创建游标
cursor = conn.cursor()# 查询操作,并返回收影响行数
effect_row = cursor.execute("select * from empinfo")# 更新操作,并返回受影响行数
effect_row = cursor.execute("update empinfo set DeptID = 3 where empID = %s", (1,))# 创建多条记录,并返回受影响行数,执行多次
effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("财务部"),("公关部")])
print(effect_row)# 提交,不然无法保存新建或者修改的数据
conn.commit()# 关闭游标
cursor.close()
# 关闭连接
conn.close()

 

1.3 获取查询数据

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
# 获取剩余结果的第一行数据
row_1 = cursor.fetchone()
# 获取剩余结果前n行数据
rows_n = cursor.fetchmany(3)# 获取剩余结果所有数据
rows_all = cursor.fetchall()
utils.foreach(rows_all)
utils.commitAndClose(conn,cursor)

 

1.4 获取新创建数据的自增ID

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
effect_row = cursor.executemany("insert into deptinfo(DeptName) values(%s)", [("市场部"), ("广告部")])
new_id = cursor.lastrowid
print(new_id)
utils.commitAndClose(conn,cursor)

 

1.5 移动游标

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
# 移动游标:在fetch数据时按照顺序进行,可以使用cursor.scroll(num, mode)来移动游标位置,如
# 0开始移动2行数据
cursor.scroll(2,"absolute")
row_1 = cursor.fetchone()
# 结果应该是第三行的数据 8002
utils.foreach(row_1)
# 从当前游标8003开始往下移动2行数据,
cursor.scroll(2, "relative")
# 获取剩余结果的第一行数据,即8005
row_1 = cursor.fetchone()
utils.foreach(row_1)
utils.commitAndClose(conn,cursor)

 

1.6 fetch数据类型

fetch默认返回的是一个元组类型的数据,如果你希望返回字典类型数据,怎么办?

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,
db,charset)
cursor = conn.cursor()
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 查询数据
cursor.execute("SELECT invoice_id,invoice_type_id,party_id_from,party_id,status_id,invoice_date FROM invoice")
row_1 = cursor.fetchone()
utils.foreachMap(row_1)
utils.commitAndClose(conn,cursor)

 

1.7 调用存储过程

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 调用无参存储过程  等价于cursor.execute("call p2()")
cursor.callproc('p2')
# 调用有参存储过程
cursor.callproc('p1', args=("nicky",28))
# 获取执行完存储的参数,参数@开头
cursor.execute("select @p1,@_p1_1,")
row_1 = cursor.fetchone()
utils.foreachMap(row_1)
utils.commitAndClose(conn,cursor)

 

1.8 防止pymysql SQL 注入

第一:字符串拼接造成的注入

比如:

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
user = "jacky' or '1'-- "
deptno = "u1pass"
sql = "select username,job from empinfo where username='%s' and deptno='%s'" % (user, deptno)# 拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。
# SELECT username,job From empinfo WHERE username='jacky' or 1'-- and deptno='%s'" % (user, deptno)
row_count = cursor.execute(sql)
row_1 = cursor.fetchone()
utils.commitAndClose(conn,cursor)

解决办法: 使用pymysql提供的参数化查询

utils = DBUtils()
conn = utils.getConnection(host,port,user,password,db,charset)
cursor = conn.cursor()
user = "jacky' or '1'-- "
deptno = "u1pass"
# 避免注入,使用pymysql提供的参数化语句
row_count = cursor.execute("select username,job from empinfo where username=%s and deptno=%s" % (user, deptno))
row_1 = cursor.fetchone()
utils.commitAndClose(conn,cursor)

它在内部解析 的时候,会对特殊进行转义

第二:使用存mysql储过程动态执行SQL防注入

 

1.9 使用with简化连接过程

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

 

import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host, port, user, password, db, charset='utf8'):conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db, charset=charset)cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)try:yield cursorfinally:conn.commit()cursor.close()conn.close()# 执行sql
with mysql() as cursor:print(cursor)row_count = cursor.execute("select * from invoice")row_1 = cursor.fetchone()print(row_count, row_1)
 
 
二 Memcached 相关python操作

import memcache

class MemcachedUtils(object):
    def __init__(self,hostname,port):
        self.hostname = hostname
       
self.port = port

   
def getMemcachedClient(self,debug):
        #{'192.168.3.40:11211'} 或者['192.168.3.40:11211s']
        # _servers = {"%s:%s"%(self.hostname,self.port)}
       
_servers = ["%s:%s" % (self.hostname, self.port)]
        c = memcache.Client(_servers,debug=debug)
        return c



tools = MemcachedUtils("192.168.3.40",11211)
client = tools.getMemcachedClient(True)

# add(k,v):添加键值对, 不能添加重复的key,否则报错
# client.add('foo','bar')

# replace():
更新键值对,如果key存在则更新,不存在则异常
client.replace('foo','balance')

# set(k,v):设置键值对,相当于添加,和add的区别在于,它不存在则添加,存在则修改
client.set('dname','科技部')

# set_multiu({k1:v1,k2:v2}):设置多个键值对,它不存在则添加,存在则修改
client.set_multi({'food':'noodle','foo':'hah'})

# append(k): 修改指定key的值,在该值后面追加内容
client.append('foo','_end')
# prepend(k):修改指定key的值,在该值前面插入内容
client.prepend('foo','start_')

# incr(k): 根据k值使得对应值加1
client.incr('num')
# decr(k): 根据k值使得对应值减1
client.decr('num')

# delete(k): 根据key删除键值对
client.delete('foo')
# delete([k1,k2]): 根据多个key,删除多个键值对
client.delete_multi('x','y')

# get(k):查询键值对
client.get('foo')
# get_multi([k1,k2]):批量查询
client.get(['k1','k2'])

'''
每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,
会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如
果不想等,那表示在getscas执行之间,又有其他人执行了gets(获取了缓冲的指定值
),如此一来有可能出现非正常数据,则不允许修改。
'''
v = client.gets('product_count')
# ...
#
如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
client.cas('product_count', "899")

 
三 Redis

import redis

'''
redis-py
提供两个类RedisStrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,
并使用官方的语法和命令,RedisStrictRedis的子类,用于向后兼容旧版本的redis-py
'''

r = redis.Redis(host="192.168.3.40",port=6379)
r.set('foo','bar')
print(r.get('foo'))

'''
redis
连接池
redis-py
使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,
每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis
这样就可以实现多个Redis实例共享一个连接池。
'''
pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'bar1')
print(str(r.get('foo'),'utf8'))

3.1 string操作

'''
set(name, value, ex=None, px=None, nx=False, xx=False)
ex
过期时间(秒)
px:
过期时间(毫秒)
nx:
当只有Name不存在时,当前set操作才执行
xx:
只有当Name存在时,才执行当前操作
'''
r.set('foo','bar2',ex=1,xx=True)
# 结果为bar2
print
(str(r.get('foo'),'utf8'))

r.set('foo','bar3',ex=1,nx=True)
# 结果为bar2
print
(str(r.get('foo'),'utf8'))

'''
setnx
name,value: 不能存在才修改
setex(name,value,time):
设置过期时间(秒)
psetex(name, time_ms, value)
设置过期时间(毫秒)
mset(*args, **kwargs):
批量设置值
mget
批量获取
mget('key1','key2') or mget(['key1','key2'])
getset(name, value):
设置新的值并获取原来的值
getrange(key, start, end)
'''
r.setnx('foo','bar3')
# 还是打印bar1,因为foo已经存在
print
(str(r.get('foo'),'utf8'))

r.setex('address','NewYork Royal Road #4',1)
print(str(r.get('address'),'utf8'))

r.psetex('message',1000,'haha')
print(str(r.get('message'),'utf8'))

r.mset(name="Nicky",age=28,gender="male",hobby=["booking","sporting"])
r.mset({'key1':'value1','key2':'value2'})

resultList = r.mget(['name','age'])
resultList = r.mget('gender','hobby')
for result in resultList:
    print(str(result,'utf8'))

x = r.getset('foo','new bar')
print(str(x,'utf8'))

 

# getrange(name,start,end): 根据字节获取子序列
r.set('name','郑涵月')
v = r.getrange('name',0,2) #

# setrange(name, offset, value):
修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
r.setrange('name',3,'欣寒')
v = r.get('name')

# strlen(name): 返回name对应值的长度
len = r.strlen('name') # 9

# incr(self, name, amount=1):
自增
r.set('age','28')
r.incr('age',amount=2) # age=30

# incrbyfloat(self, name, amount=1.0)
:浮点数自增
r.incrbyfloat('age',amount=2.5) # 32.5

# decr(self, name, amount=1)
自减
# r.decr('age',amount=2)

# append(key, value)
:在key值后面加上value
r.append('name',',我喜欢你')
print(str(r.get('name'),'utf-8'))

 
3.2 hash操作
对提供的字符串等进行hash操作,我们就可以根据得到的hash值去标记桶,只要hash值相同,就往这个桶里扔

# hset(key,field,value):将哈希表 key 中的字段 field 的值设为 value
r.hset('myhash','color','yellow')
r.hset('myhash','length','5')
r.hset('myhash','weight','20')

# hash表中批量设置键值对
r.hmset('myhash', {'country':'china','address':'四川成都'})

# hash表中根据key获取value
v = r.hget('myhash','address')

# hash表中批量获取
v = r.hmget('myhash',['color','length','length','weight'])

# hgetall(name):获取该hash表中所有value
v = r.hgetall('myhash')

# hlen(name) 获取hash表对应的hash中键值对的个数
print
(r.hlen('myhash'))

# hkeys(name): 获取name对应的hash中所有的key的值
# hvals(name):
获取name对应的hash中所有的value的值
r.hkeys('myhash')
r.hvals('myhash')

# hexists(name, key):检查name对应的hash是否存在当前传入的key
r.hexists('myhash','color')

# hdel(name,*keys)name对应的hash中指定key的键值对删除
r.hdel('myhahs',['k1','k2'])

# hincrby(name,key,amount=1):自增name对应的hash中的指定key的值,不存在则创建key=amount
# hincrbyfloat(name, key, amount=1.0)
同理

# hscan(name, cursor=0, match=None, count=None)
#
量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆
# name
redisname
# cursor
,游标(基于游标分批取获取数据)
# match
,匹配指定key,默认None 表示所有的key
# count
,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
#
如:
#
第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
#
第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
#
直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

# hscan_iter(name, match=None, count=None)
:利用yield封装hscan创建生成器,实现分批去redis中获取数据
# match
,匹配指定key,默认None 表示所有的key
# count
,每次分片最少获取个数,默认None表示采用Redis的默认分片个数

 
3.3 list操作

r = redis.Redis(host="192.168.3.40",port=6379)

# lpush(name,values):name对应的list中添加元素,每个新的元素都添加到列表的最左边
# rpush(name, values)
表示从右向左操作
# r.lpush('numList',11,22,33)
# r.rpush('numList',44,55,66)

# lpushx(name,value):
# rpushx(name, value)
表示从右向左操作
# r.lpushx('numList',11)

# llen(name):name
对应的list元素的个数
v = r.llen('numList')

'''
linsert(name, where, refvalue, value)):
name对应的列表的某一个值前或后插入一个新值
#
参数:
name
redisname
where
BEFOREAFTER
refvalue
,标杆值,即:在它前后插入数据
value
,要插入的数据
'''

'''
r.lset(name, index, value):# r.lset(name, index, value):
# name
redisname
    # index
list的索引位置
    # value
,要设置的值
'''
r.lset('foo',2,'hah')

# r.lrem(name, value, num):name对应的list中删除指定的值
# name
redisname
    # value
,要删除的值
    # num
  num=0,删除列表中所有的指定值;
           # num=2,
从前到后,删除2个;
           # num=-2,
从后向前,删除2

# lpop(name):
name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
# rpop
:从右往左操作

# lindex(name, index)
:在name对应的列表中根据索引获取列表元素
elements = r.lindex('numList',11)

# lrange(name, start, end)
# name
redisname
# start
,索引的起始位置
# end
,索引结束位置
r.lrange('numList',0,2)
# ltrim(name, start, end):name对应的列表中移除没有在start-end索引之间的值
r.ltrim('NUMlIST',2,3)

# rpoplpush(src, dst):从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边
# src
,要取数据的列表的name
# dst
,要添加数据的列表的name
r.lpush('list1',20)
r.lpush('list2',10,30,40)
r.rpoplpush('list2','list1') # 结果就是[20 10]

#
从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧
# src
,取出并要移除元素的列表对应的name
# dst
,要插入元素的列表对应的name
# r.brpoplpush('list1', 'list2', timeout=2)

# r.pop(src)
从给定列表的末尾处取出一个元素,并从队列移除
# r.lpop('list2')
#
自定义增量迭代
def list_iter(r,name):
    count = r.llen(name)
    for index in range(count):
        yield r.lindex(name,index)

def foreach(r,name):
    elements = list_iter(r,name)
    for item in elements:
        print(str(item, "UTF-8") + " ")
  
foreach(r,'list1')
foreach(r,'list2')

 
3.4 set操作

'''
Set
集合就是不允许重复的列表
'''
# sadd(name,values):向集合添加元素
r.sadd('文科','语文','数学','英语','文综')
r.sadd('理科','语文','数学','英语','理综')

# scard(name):获取name对应的集合中元素个数
print
(r.scard('文科'))

# sdiff(keys, *args):在第一个name对应的集合中且不在其他name对应的集合的元素集合
elements = r.sdiff('文科','理科')
for i in elements:
    print(str(i,"UTF-8"))

# sdiffstore(dest, keys, *args)获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
r.sdiffstore('高一','文科','理科')

# sinter(keys, *args): 获取集合的交集
elements = r.sinter('文科','理科')
for i in elements:
    print(str(i,"UTF-8"))

# sinterstore(dest, keys, *args)获取多个集合交集,然后放到一个新的目的集合
r.sinterstore('高一','文科','理科')

# sunion(keys, *args):获取多个集合的并集
r.sunion('文科','理科')

# sunionstore(dest,keys, *args):获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
r.sunionstore('高三','文科','理科')

# sismember(name, value): 检查value是否是name集合成员
r.sismember('文科','语文')

# smembers(name)获取某个集合所有成员
r.smembers('理科')


# smove(src, dst, value):将某一个成员从src移到dest集合中
r.smove('文科','理科','文综')

for i in r.smembers('理科'):
    print(str(i,"UTF-8"))

# spop(name): 从集合的右侧(尾部)移除一个成员,并将其返回
r.spop('高一')

# srandmember(name, numbers):从name对应的集合中随机获取 numbers 个元素
r.srandmember('文科',2)

# name对应的集合中删除某些值
r.srem('高一', '语文','数学')

'''
用于增量迭代分批获取元素,避免内存消耗太大
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
'''
elements  = r.sscan('文科',count=2)
for item in elements[1]:
    print(str(item,'UTF-8'))

 
3.5 有序集合

'''
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,
所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
'''

# zadd(name, *args, **kwargs): 添加元素,且需要提供分数
r.zadd('animals','lions',1,'tiger',2,'bulk',3)
r.zadd('vegetables',tomato=1,eggplant=2)

# zcard(name):获取name对应的有序集合元素的数量
r.zcard('animals')

#zcount(name, min, max): 计算分数在[minmax]之间的元素的数量
r.zcount('animals',2,3)

# zincrby(name, value, amount):自增name对应的有序集合的 name 对应的分数
r.zincrby('vegetables','tomato',amount=40)

'''
可以按照范围获取数据
r.zrange( name, start, end, desc=False, withscores=False,score_cast_func=float)
# name
redisname
# start
,有序集合索引起始位置(非分数)
# end
,有序集合索引结束位置(非分数),如果为-1,表示区全部数据,-2表示倒数第2为止
# desc
,排序规则,默认按照分数从小到大排序
# withscores
,是否获取元素的分数,默认只获取元素的值
# score_cast_func
,对分数进行数据转换的函数
'''
#r.zadd('moutains','泰山',80,'黄山',85,'峨眉山',90,'衡山',75,'恒山',60,'武当山','75','青城山',70)
elements = r.zrange('moutains',start=0,end=-1,desc=True,score_cast_func=int)

'''
从大到小排序,不用我们自己指定是否降序
zrevrange(name, start, end, withscores=False, score_cast_func=float)
'''
elements = r.zrevrange('moutains',start=0,end=3,score_cast_func=int)

'''
zrangebyscore(name, min, max, start=None, num=None, withscores=False,score_cast_func=float)
按照分数范围获取name对应的有序集合的元素,自动排序
# name
redisname
# min:
最小分数值
# max:
最大分数值
# start:
从分数的结果哪一个索引位开始
# num:
从索引位开始取多少个数据
'''

elements = r.zrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)

'''
从大到小排序
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False,score_cast_func=float)
'''
elements = r.zrevrangebyscore('moutains',min=75,max=100,start=1,num=2,score_cast_func=int)


# zrank(name, value) 获取某个值在 name对应的有序集合中的排行,升序

# zrevrank(name, value)
,从大到小排序,降序
rank = r.zrank('moutains','武当山')
rank = r.zrevrank('moutains','武当山')

# zrem(name,values):删除name对应的有序集合中值是values的成员
r.zrem('moutains',['武当山','青城山'])

# zremrangebyrank(name, min, max):根据rank进行范围删除
elements = r.zremrangebyrank('moutains',min=0,max=2)

# zremrangebyscore(name, min, max):根据分数范围删除

elements = r.zremrangebyrank('moutains',min=60,max=70)

# zscore(name, value):获取name对应有序集合中 value 对应的分数
r.zscore('moutains','峨眉山')


'''
zinterstore(dest, keys, aggregate=None)
#
获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate
的值为:  SUM MIN  MAX

zunionstore(dest, keys, aggregate=None)
#
获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate
的值为:  SUM MIN  MAX
'''

'''
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
'''

 
3.6 其他命令

# 根据删除redis名字中的任意数据类型
r.delete('animal','animals')

# 检测redisname是否存在
v = r.exists('animals')

根据正则获取redisname
s = r.keys("*")

# 为某个redis的某个name设置超时时间
r.expire('mountains',1000)

# redisname重命名为
r.rename('animal', 'animals')

# move(name, db))redis的某个值移动到指定的db

#
随机获取一个redisname(不删除)
r.randomkey()

# type(name):获取name对应值的类型

 
3.7 管道
Redis python每一次执行请求都会在连接池申请连接,和归还连接,如果想要在一个请求中执行多个命令,你可以使用piepline,并且默认情况下一次pipline 是原子性操作。

pool = redis.ConnectionPool(host='192.168.3.40', port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set('name', 'alex')
pipe.lpush('elements',12,3,4,5)
pipe.execute()

 
3.8 发布订阅

class MessageQueue:

    def __init__(self):
        self.__conn = redis.Redis(host="192.168.3.40",port=6379)
        self.subscribe_channel = 'music'
       
self.publish_channel = 'music'

   
def publish(self, msg):
        self.__conn.publish(self.publish_channel, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.subscribe_channel)
        pub.parse_response()
        return pub


mq = MessageQueue()
sub = mq.subscribe()
while True:
    msg = sub.parse_response()
    print(msg)

 
 
 

from db.tmp import MessageQueue

mq = MessageQueue()
mq.public('你好美')

 
 
四 RabbitMQ

import pika

'''
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
'''
class MQTools(object):
    def getConnection(self):
        params = pika.ConnectionParameters(host='192.168.3.40',port=22222)
        conn = pika.BlockingConnection(params)
        return conn

class Producer(object):
    def handle(self):
        tools = MQTools()
        conn = tools.getConnection()
        channel = conn.channel()
        channel.queue_declare(queue='animals')
        channel.basic_publish(exchange='',routing_key='animals',body='tiger')
        conn.close()

class Consumer(object):
    def handle(self):
        tools = MQTools()
        conn = tools.getConnection()
        channel = conn.channel()
        channel.queue_declare(queue='animals')
        channel.basic_consume(self.callback,queue='animals',no_ack=True)
        channel.start_consuming()

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)


'''
1
acknowledgment 消息不丢失
no-ack
False,如果消费者遇到情况(its channelis closed, connection is
closed, or TCP connection is lost)
挂掉了,那么,RabbitMQ会重新将该任务添加到队列中
'''
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.queue_declare(queue='animals')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='animals',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
2 durable  
消息不丢失
'''
# 生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='animal', durable=True)
channel.basic_publish(exchange='',routing_key='animal',
                      body='lions!',properties=pika.BasicProperties(delivery_mode=2, # make message persistent
                     
))
print(" [x] Sent'Hello World!'")
connection.close()

# 消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='hello',no_ack=False)

print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
3
、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者1去队列中
获取偶数序列的任务。channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
'''

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='hello',no_ack=False)
print(' [*] Waiting formessages. To exit press CTRL+C')
channel.start_consuming()

'''
4
、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。
所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
'''

# 发布者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent%r" % message)
connection.close()

# 订阅者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',queue=queue_name)

print(' [*] Waiting forlogs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

'''
5
、关键字发送
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,
即:队列绑定关键字,发送者将数据根据关键字发送到消息exchangeexchange根据关键字
 
判定应该将数据发送至指定队列。
'''
#生产者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent%r:%r" % (severity, message))
connection.close()

# 消费者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

'''
6
、模糊匹配
exchange type = topic

topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange
exchange
将传入路由值关键字进行匹配,匹配成功,则将数据发送到指定队列
#
表示可以匹配 0 多个单词
表示只能匹配一个单词
'''
# 生产者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent%r:%r" % (routing_key, message))
connection.close()

# 消费者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.3.40'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage:%s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)

print(' [*] Waiting forlogs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()

 

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/93028.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息