基于Nonebot的班级消息管理机器人
项目介绍以及关于Nonebot
大二的时候,我在面对学校各种繁杂且混乱通知感到乏力,我想到开发一款以消息和时间作为基本数据,对qq(或者微信)中各种通知群的通知消息进行识别并进行管理。而这里的管理则是对消息进行重现,即在消息的deadline前进行多次提醒。
关于nonebot ,NoneBot 是一个 Python 的异步 QQ 机器人框架,允许开发者们以插件化的形式对 QQ 机器人收到的消息进行解析和处理来完成具体的功能。点击这里进入nonebot官网
Nonebot 以及 Python jdk的安装。
首先市面上各种qq、微信机器人都相当多,大可不必一定要使用Nonebot。而对于Nonebot开发者们可以使用各种各样的开发语言进行拓展。我这里使用到了python语言。pypi 最新的Python开发包
机器人项目成品
项目文件展示
这和机器人的版本更新有关,不一定是这样的。我的文件夹放的目录如下
介绍的两个py文件:
CQPlusHandler 中断处理函数(负责消息转发的基本流程控制)
YB_searchtime 时间处理函数 (其中有函数传入字符串,返回datetime接口)

CQPlusHandler 中断处理函数
首先中断函数处理,当机器人收到消息触发中断方式。这个文件处理消息的时候,进行一系列的消息处理。
首先最基本的定时发送功能
其次 在此之上的识别消息所带时间信息,并定时发送
以下是源代码
# -*- coding:utf-8 -*-
import cqplus,YB_searchtime,re,time,_thread, os ,random
from datetime import datetime,timedelta
from threading import Thread
#配置基本信息
revprivate = ()#监听qq名单
sendprivate = []
revgroup = []#要监听的qq消息群
sendgroup = [917292963] #要发送的qq消息群
f = open('msgdata.txt',mode='r+', encoding='UTF-8-sig')
msgdata = f.read()
msgdata = msgdata.split("\n" or '\\ufeff')
ksdtime = datetime.strptime("2019-12-23","%Y-%m-%d")
class MainHandler(cqplus.CQPlusHandler):
def handle_event(self, event, params):
#self event为收到消息事件 字符串
#params 为 事件的参数 dict 类型
if event=="on_private_msg":
if re.match("查询最近的事",params['msg']):
# self.api.send_private_msg(params['from_qq'], "触发了")
for temp in msgdata[::2]:
self.api.send_private_msg(params['from_qq'], temp)
if event=='on_group_msg':
if params['from_group'] in revgroup and params['from_group'] != 916367320:
self.api.send_group_msg(916367320, params['msg'])
for target_time in YB_searchtime.time_extract(params['msg']):
msgdata.append(target_time.strftime("%Y-%m-%d %H:%M:%S"))
msgdata.append(params['msg'])
f.seek(0)
f.truncate()
f.seek(0)
for i in msgdata:
if (i!=""):
f.write(i+"\n")
f.close()
for i in msgdata:
if ((msgdata.index(i)+1)%2 and i!=""):
try:
target_time = datetime.strptime(i, "%Y-%m-%d %H:%M:%S")
if target_time <= datetime.today():
a = msgdata.index(i)
del msgdata[a]
for temp in sendgroup:
pass
#self.api.send_group_msg(temp,msgdata[a])
del msgdata[a]
f.seek(0)
f.truncate()
f.seek(0)
for temp in msgdata:
if (i!="" and i!='\n'):
f.write(temp+"\n")
f.close()
except:
f.close()
# 这一部分为定时发送功能
q = open('D:\\机器人项目\\酷Q Air\\app\\cn.muxiaofei.coolq_sdk_x\\senddata.txt', mode='r+',
encoding='UTF-8-sig')
lz = open('D:\\机器人项目\\酷Q Air\\app\\cn.muxiaofei.coolq_sdk_x\\sendlz.txt', mode='r+',
encoding='UTF-8-sig')
lz = lz.read().split("\n" or '\\ufeff')
senddata = q.read()
senddata = senddata.split("\n" or '\\ufeff')
for i in senddata:
if ((senddata.index(i) + 1) % 2 and i != ""):
target_time = datetime.strptime(i, "%Y-%m-%d %H:%M:%S")
target_time = datetime.strptime(i, "%Y-%m-%d %H:%M:%S")
if target_time <= datetime.today():
a = senddata.index(i) + 1
for temp in sendgroup:
self.api.send_group_msg(temp, senddata[a])
if "每日一句" in senddata[a]:
self.api.send_group_msg(temp, "离考试还有:"+str((ksdtime-datetime.today()).days)+"天。\n"+lz[random.randint(0, 21)])
target_time = target_time + timedelta(days=1)
senddata[a-1] = target_time.strftime("%Y-%m-%d %H:%M:%S")
q.seek(0)
q.truncate()
q.seek(0)
for temp in senddata:
if (i != "" and i != '\n'):
q.write(temp + "\n")
q.close()
YB_searchtime 时间处理函数
这是整个插件项目的重头工作。我们需要将一段自然语言中提取出时间信息比如说有(‘明天下午’,‘今天两点’,‘两天后’,‘10月份底’,‘10月29日’)等等
如果您不只是想要时间提取的代码,我将细述这种算法代码的运行原理,如果您学过编译原理,那么这种算法和编译原理中提取关键字阶段的工作十分相似。
下来我把我使用的代码(对原来的代码有完善和修改,我找不到原创的作者)
# -*- coding:utf-8 -*-
import re
from datetime import datetime, timedelta
import jieba.posseg as psg
UTIL_CN_NUM = {
'零': 0, '一': 1, '二': 2, '两': 2, '三': 3, '四': 4,
'五': 5, '六': 6, '七': 7, '八': 8, '九': 9,
'0': 0, '1': 1, '2': 2, '3': 3, '4': 4,
'5': 5, '6': 6, '7': 7, '8': 8, '9': 9
}
UTIL_CN_UNIT = {'十': 10, '百': 100, '千': 1000, '万': 10000}
def cn2dig(src):
if src == "":
return None
m = re.match("\d+", src)
if m:
return int(m.group(0))
rsl = 0
unit = 1
for item in src[::-1]:
if item in UTIL_CN_UNIT.keys():
unit = UTIL_CN_UNIT[item]
elif item in UTIL_CN_NUM.keys():
num = UTIL_CN_NUM[item]
rsl += num * unit
else:
return None
if rsl < unit:
rsl += unit
return rsl
def year2dig(year):
res = ''
for item in year:
if item in UTIL_CN_NUM.keys():
res = res + str(UTIL_CN_NUM[item])
else:
res = res + item
m = re.match("\d+", res)
if m:
if len(m.group(0)) == 2:
return int(datetime.datetime.today().year / 100) * 100 + int(m.group(0))
else:
return int(m.group(0))
else:
return None
def parse_datetime(msg):
if msg is None or len(msg) == 0:
return None
#try:
# dt = parse(msg, fuzzy=True)
# return dt.strftime('%Y-%m-%d %H:%M:%S') #这段代码 原作者 减少的不好 这段代码可能会出现 年份问题,不如直接使用正则
# except Exception as e:
m = re.match(
r"([0-9零一二两三四五六七八九十]+年)?([0-9一二两三四五六七八九十]+月)?"
r"([0-9一二两三四五六七八九十]+[号日])?([上中下午晚早]+)?"
r"([0-9零一二两三四五六七八九十百]+[点:\.时])?"
r"([0-9零一二三四五六七八九十百]+分?)?([0-9零一二三四五六七八九十百]+秒)?",
msg)
if m.group(0) is not None: #,添加了一个try
try:
res = {
"year": m.group(1),
"month": m.group(2),
"day": m.group(3),
"hour": m.group(5) if m.group(5) is not None else '12',
"minute": m.group(6) if m.group(6) is not None else '00',
"second": m.group(7) if m.group(7) is not None else '00',
}
params = {}
for name in res:
if res[name] is not None and len(res[name]) != 0:
tmp = None
if name == 'year':
tmp = year2dig(res[name][:-1])
else:
tmp = cn2dig(res[name][:-1])
if tmp is not None:
params[name] = int(tmp)
target_date = datetime.today().replace(**params)
is_pm = m.group(4)
if is_pm is not None:
if is_pm == u'下午' or is_pm == u'晚上' or is_pm == u'中午':
hour = target_date.time().hour
if hour < 12:
target_date = target_date.replace(hour=hour + 12)
if target_date < datetime.today() and datetime.today().hour - target_date.hour <= 12:
target_date = target_date.replace(hour=target_date.time().hour + 12)
return target_date
except:
return None
else:
return None
def check_time_valid(word):
m = re.match("\d+$", word)
if m:
if len(word) <= 6:
return None
if int(word) > 30000000:
return None
word1 = re.sub('[号|日]\d+$', '日', word)
if word1 != word:
return check_time_valid(word1)
else:
return word1
# 时间提取
def time_extract(text):
time_res = [] # 提取出来的时间字符串放到这里
word = ''
keyDate = {'今天': 0, '明天': 1, '后天': 2, '下周': 7}
for k, v in psg.cut(text):
if k == "个" or v=="eng":
continue
# print(k,v)
if k in keyDate:
if word != '':
time_res.append(word)
word = (datetime.today() + timedelta(days=keyDate.get(k, 0))).strftime('%Y{y}%m{m}%d{d}').format(y='年',m='月',d='日')
elif word != '':
if v in ['m', 't']:
word = word + k
else:
time_res.append(word)
word = ''
elif v in ['m', 't']:
word = k
if word != '':
time_res.append(word)
result = list(filter(lambda x: x is not None, [check_time_valid(w) for w in time_res]))
print(result)
final_res = [parse_datetime(w) for w in result]
# print(final_res)
return [x for x in final_res if x is not None]
text1 = '希望明天大家做好整理;做好地面清洁;去除异味;收好违章电器;在八点半之前离开宿舍(不管有无课程都离开宿舍);'
text2 = '这是不忘初心,牢记使命主题教育活动,建议同学踊跃报名,人数不设上限,8点前报给班长,特别是思想上积极要求进步的同学'
text3 = '明天上午10点左右校领导参观书院,请全体同学做好迎接检查准备:打扫2个宿舍卫生,清理杂物垃圾,杜绝违章电器,靠近过道窗帘拉开。特别是洗脸池,要彻底打扫。'
print(text3,time_extract(text3), sep=';')
编译原理一些基本知识
这是编译器的一个工作流程,分为词法分析、语法分析、语义分析、中间代码生成、代码优化、目标代码生成。而我们要提取时间的工作,用到了前三项即词法分析、语法分析、语义分析。如:图1

词法分析将我们输入的自然语言转化为记号流。

语法分析则是对记号流进行句子的结构处理,生成语法树。

而语义分析则是要对语法进行‘任务’的先后排列,即可以看到任务运行的先后顺序。










