emma_tools_contents.txt•66.8 kB
Use the following files and contents for analysis:
README:
# emma_tools
收集测试过程中使用的一些常用的代码类等
## 说明
EMMA_Tools是我自己积累的一些内容,其中有来自网络的,有一些是自己总结的,都是平时工作需要用到的内容。
代码基于python3.x及其以上版本稳定,python2.x版本目前还不知道!
## 内容说明
| 文件名 | 功能描述 |
| ---- | ---- |
|dict_reduce_deep | 将一个深度非常深的嵌套非常多的dict变成深度为1的dict(无嵌套的) |
|get_dirAndfiles|获取各种目录文件|
|git_distance|计算两经纬度间的距离|
|global_menager|项目级的全局变量管理器,通过global定义全局字典,完成项目的全局变量的定义|
|httpstat|httpstat 应该是一个 python 封装后的 curl 工具能够展现 一些客户端连接网站的时间消耗,最近在看tls 感觉挺有用处的 简单学习一下|
|image_change|图片的转换类,包含了转换图片格式\改变图片高宽像素大小\添加噪音等|
|img2Base63|将图片变成base64编码,然后在web中打开,不用引入图片外部图片文件|
|info_hidden2img|将信息隐藏到图片中|
|init_test_json|生成测试要用的json|
|merge_audio|合并多个音频文件|
|pic2video|图片转成视频|
|qrcode4test|为测试生成二维码图片以及二维码图片信息内容解析|
|remover|删除文件或者目录|
|rotate_img|图片旋转|
|sende_mail|发送email|
|slipt_img|图片切割|
|stack|栈的操作|
|swagger2json|导出swagger到json文件,按照openapi的标准导出|
|test_string|测试需要处理字符串的类,可以通过左右边界找出全部符合的字符串|
|timer|计算耗时的装饰器封装|
|ureboot_processer|修改脚本后不用重启服务,调用watchdog不用启动识别修改|
|video_change|视视频修改码率的,为创建各种不同测试使用的视频文件做准备|
|video_merge_audio| 利用moviepy库,给视频添加一个背景音乐|
|zip_memless|低内存压缩|
Repository structure: emma_tools
/.vscode/
/.gitignore
/Img2Base64.py
/ReadMe.md
/__init__.py
/dict_reduce_deep.py
/get_dirAndfiles.py
/get_distance.py
/global_manager.py
/image_change.py
/info_hidden2img.py
/init_test_json.py
/merge_audio.py
/merge_img.py
/pic2video.py
/qrcode4test.py
/remover.py
/rotate_img.py
/send_email.py
/slipt_img.py
/stack.py
/swagger2json.py
/test_string.py
/timer.py
/ureboot_processer.py
/video_change.py
/video_merge_audio.py
/work_webchat_robot.py
/zip_memless.py
/.vscode/launch.json
File: /.gitignore
Content: Skipped binary file
File: /Img2Base64.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'EMMATools https://github.com/crisschan/EMMATools'
#__title__='PyCharm'
#__author__ = 'chancriss'
#__mtime__ = '20160908'
#__instruction__=' 将图片变成base64编码,然后在web中打开,不用引入图片外部图片文件'
import base64
import os
class Img2Base64(object):
'''
将图片变成base64编码,然后在web中打开,不用引入图片外部图片文件
'''
def __init__(self,sImgDir):
fr = open(sImgDir, 'rb')
self.__hexImg=base64.b64encode(fr.read())
fr.close()
@property
def HexImg(self):
return self.__hexImg.decode()
if __name__=='__main__':
img =Img2Base64('WechatIMG7.png')
print('data:image/bmp;base64,'+img.HexImg)
File: /ReadMe.md
Content:
# emma_tools
收集测试过程中使用的一些常用的代码类等
## 说明
EMMA_Tools是我自己积累的一些内容,其中有来自网络的,有一些是自己总结的,都是平时工作需要用到的内容。
代码基于python3.x及其以上版本稳定,python2.x版本目前还不知道!
## 内容说明
| 文件名 | 功能描述 |
| ---- | ---- |
|dict_reduce_deep | 将一个深度非常深的嵌套非常多的dict变成深度为1的dict(无嵌套的) |
|get_dirAndfiles|获取各种目录文件|
|git_distance|计算两经纬度间的距离|
|global_menager|项目级的全局变量管理器,通过global定义全局字典,完成项目的全局变量的定义|
|httpstat|httpstat 应该是一个 python 封装后的 curl 工具能够展现 一些客户端连接网站的时间消耗,最近在看tls 感觉挺有用处的 简单学习一下|
|image_change|图片的转换类,包含了转换图片格式\改变图片高宽像素大小\添加噪音等|
|img2Base63|将图片变成base64编码,然后在web中打开,不用引入图片外部图片文件|
|info_hidden2img|将信息隐藏到图片中|
|init_test_json|生成测试要用的json|
|merge_audio|合并多个音频文件|
|pic2video|图片转成视频|
|qrcode4test|为测试生成二维码图片以及二维码图片信息内容解析|
|remover|删除文件或者目录|
|rotate_img|图片旋转|
|sende_mail|发送email|
|slipt_img|图片切割|
|stack|栈的操作|
|swagger2json|导出swagger到json文件,按照openapi的标准导出|
|test_string|测试需要处理字符串的类,可以通过左右边界找出全部符合的字符串|
|timer|计算耗时的装饰器封装|
|ureboot_processer|修改脚本后不用重启服务,调用watchdog不用启动识别修改|
|video_change|视视频修改码率的,为创建各种不同测试使用的视频文件做准备|
|video_merge_audio| 利用moviepy库,给视频添加一个背景音乐|
|zip_memless|低内存压缩|
File: /__init__.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/5/20 12:24 下午
# @Author : CrissChan
# @Site : https://blog.csdn.net/crisschan
# @File : __init__.py.py
# @instruction_file:
# 1、get-pip.py:安装pip,python3 get-pip.py
# 2、httpstat.py:访问状态 python3 httpstat.py http://blog.csdn.net/crisschan
# 3、ureboot_processer.py:支持热修复, nohup ./UrebootProcesser xxxx.py &
# @instruction_module:
# 1、Dict2DeepSimpler:将一个深度非常深的dict变成深度为1的dict(无嵌套的)
# 2、DirAndFiles:获取各种目录文件:
# 3、GetDistance:计算两经纬度间的距离
# 4、ImageChange:所有图片类型的相互转换
# 5、Img2Base64:将图片变成base64编码,然后在web中打开,不用引入图片外部图片文件
# 6、SendeMail:发送邮件
# 7、SplitImg:图片分割
# 8、RotateImg:图片旋转
# 9、Stack:栈
# 10、InitTestJson:生成测试要用的json
# 11、TestString:测试需要处理字符串的类,类似LoadRunner的关联方法
# 12、Timer:计算耗时的装饰器封装
# 13、VideoChange:视频修改码率的,为创建各种不同测试使用的视频文件做准备
# 14、workWechat:计算耗时的装饰器封装
# MSGTYPE:message类型
# 15、ZipMemless:低内存压缩
# 16、VideoMergeAudio:合并视频和音频
from dict_reduce_deep import Dict2DeepSimpler
from get_dirAndfiles import DirAndFiles
from get_distance import GetDistance
from image_change import ImageChange
from Img2Base64 import Img2Base64
from info_hidden2img import InfoHidden2Img
from rotate_img import RotateImg
from send_mail import SendeMail
from slipt_img import SplitImg
from stack import Stack
from init_test_json import InitTestJson
from test_string import TestString
from timer import Timer
from video_change import VideoChange
from work_webchat_robot import workWechat, MSGTYPE
from zip_memless import ZipMemless
from remover import Remover
from video_merge_audio import VideoMergeAudio
from merge_audio import MergeAudio
File: /dict_reduce_deep.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'PythonSpace'
#__title__='PyCharm'
#__author__ = 'chancriss'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
#__mtime__ = '2018/1/4'
#__instruction__='将一个深度非常深的dict变成深度为1的dict(无嵌套的)'
class Dict2DeepSimpler(object):
def __init__(self,adict):
'''
Args:
adict: 输入需要降低深度的dict
'''
self.__dictResult=self.__deepSearchDict(adict)
def __deepSearchDict(self,adict):
dictResult = {}
for (k,v) in adict.items():
if type(v) is not dict:
dictResult=dict(dictResult, **{k:v})
else:
dictResult = dict(dictResult,**self.__deepSearchDict(v))
return dictResult
@property
def dictResult(self):
return self.__dictResult
'''adict = {'s':'ff','rr':{'r':'44'}}
aa= json.dumps(adict)
print type(aa)
print aa
dict2DeepSimpler = Dict2DeepSimpler(adict)
print dict2DeepSimpler.dictResult'''
File: /get_dirAndfiles.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'PythonSpace'
#__title__='PyCharm'
#__author__ = 'chancriss'
#__mtime__ = '2017/10/9'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
#__instruction__='获取各种目录文件'
import os
class DirAndFiles(object):
def __init__(self):
self.__list=[]
def allFiles(self,sOrgDir): # 传入存储的list
'''
Args:
sOrgDir: 目录
Returns:
list:全部文件,包含子目录内的
'''
self.__list=[]
for file in os.listdir(sOrgDir):
file_path = os.path.join(sOrgDir, file)
if os.path.isdir(file_path):
self.files(file_path)
else:
self.__list.append(file_path)
return self.__list
def dirAndFiles(self,sOrgDir):
'''
Args:
sOrgDir: 目录
Returns:
list[dict]:dict{'root':root,'dirs':dirs,'files':files},root是根目录、dirs root下的第一级子目录、files root下第一级文件
'''
self.__list=[]
for root, dirs, files in os.walk(sOrgDir):
self.__list.append({'root':root,'dirs':dirs,'files':files})
return self.__list
def filesWithFilter(self,sOrgDir,sEx):
'''
Args:
sOrgDir: 目录
sEx: 需要查找的扩展名
Returns:
list:符合扩展名的全部文件,包含子目录里面的
'''
self.__list=[]
for root, dirs, files in os.walk(sOrgDir):
for file in files:
if os.path.splitext(file)[1] == sEx:
self.__list.append(os.path.join(root, file))
return self.__list
if __name__=='__main__':
df = DirAndFiles()
#print df.files('/Users/chancriss/Downloads')
#print df.dirfiles('/Users/chancriss/Downloads')
#print df.filesfilter('/Users/chancriss/Downloads','.jpg')
File: /get_distance.py
Content:
#encoding=utf8
#!/usr/bin/env python
# __author_='crisschan'
# __data__='20160908'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
#__instruction__=计算两经纬度间的距离
from math import radians, cos, sin, asin, sqrt
class GetDistance(object):
def __init__(self,lon1, lat1, lon2, lat2): # 经度1,纬度1,经度2,纬度2 (十进制度数)
'''
计算两个地理坐标之间的距离
Args:
lon1: 经度1(十进制度数)
lat1: 纬度1(十进制度数)
lon2: 经度2(十进制度数)
lat2: 纬度2(十进制度数)
return:
返回之间的直线距离(米)
'''
# 将十进制度数转化为弧度
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine公式
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # 地球平均半径,单位为公里
self.distance=c * r * 1000
'''
if __name__=='__main__':
print GetDistance(39.9,116.3,33.3,114.9).distance
'''
File: /global_manager.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/8/24 2:54 下午
# @Author : CrissChan
# @From :https://github.com/crisschan/
# @Site : https://blog.csdn.net/crisschan
# @File : global_manager.py
# @Intro : 项目级的全局变量管理器,通过global定义全局字典,完成项目的全局变量的定义
# 使用方法在对应的文件中:
# import global_manager as glob
# glob._init() # 先必须在主模块初始化
# # 定义跨模块全局变量
# glob.set_value('sessionid', sessionid)
# 在使用全局变量的项目内的文件前中:
# import global_manager as glob
# sessionid=glob.get_value('sessionid')
#
# 这就达到了项目将全局变量的目的
def _init():
'''
初始化全局变量管理器
:return:
'''
global _glo_dict
_glo_dict = {}
def set_value(key, value):
'''
将全局变量存入全局变量管理器
:param key: 全局变量的名字
:param value: 全局变量的值
:return:
'''
_global_dict[key] = value
def get_value(key):
'''
:param key: 全局变量的名字
:return:
'''
try:
return _global_dict[key]
except KeyError as e:
print(e)
File: /image_change.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/9/2 4:38 PM
# @Author : Criss Chan
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# @File : image_chage.py
# @Software: PyCharm
# @instruction:所有图片的转换类
from PIL import Image
import os
from skimage import io
import random
import numpy as np
class ImageChange(object):
def __init__(self,image_file,image_path):
'''
:param image_file: 输入图片文件名
:param image_path: 输入图片存储路径,以/结尾
'''
self.image_file=image_file
self.image_path=image_path
def is_valid(self,img_path):
'''
判断文件是否为有效(完整)的图片
:param img_path:图片路径
:return:True:有效 False:无效
'''
bValid = True
try:
Image.open(img_path).verify()
except:
bValid = False
return bValid
def change_type(self,pic_type='.jpg'):
'''
转换图片格式
:param pic_type:图片类型,默认是jgp,支持png、gif等主流的图片格式
:return: True:成功 False:失败
'''
if self.is_valid(self.image_path+self.image_file):
# try:
str = self.image_file.rsplit(".", 1)
output_image= self.image_path+str[0] + pic_type
im = Image.open(self.image_path+self.image_file)
im=im.convert('RGB')
im.save(output_image)
return True
# except:
# return False
else:
return False
def change_size(self,new_hight,new_width,org_proportion=False):
'''
改变图片高宽像素大小
:param new_hight:新的 高的像素值
:param new_width: 新的宽的像素值
:param org_proportion: 是否维护远比例也就是原来图片的高/宽比例,如果选择了True,那么new_width不起作用
:return:True:成功 False:失败
'''
if self.is_valid(self.image_path + self.image_file):
str = self.image_file.rsplit(".", 1)
output_image = self.image_path + str[0]+'.' + str[1]
im = Image.open(self.image_path + self.image_file)
if org_proportion:
(width, hight) = im.size #读取当前长和宽
new_width = int(width * new_hight / hight)
out = im.resize((new_hight, new_width), Image.ANTIALIAS) # 以高质量转存新比例图片,
out.save(output_image)
return True
else:
return False
def slatpepper_noise(self) -> bool:
'''
给图片添加添加椒盐噪声,椒盐噪声也称为脉冲噪声,是图像常见的一种噪声,为随机出现的白点或者黑点,可能是亮的区域有黑色像素或是在暗的区域有白色像素(或是两者皆有)。
:return:True:成功 False:失败
'''
if self.is_valid(self.image_path + self.image_file):
str = self.image_file.rsplit(".", 1)
output_image = self.image_path + str[0] + '.' + str[1]
image = io.imread(self.image_path + self.image_file)
img = image.astype(np.uint16)
mu = 0
sigma = 10
for i in range(img.shape[0]):
for j in range(img.shape[1]):
for k in range(img.shape[2]):
img[i,j,k] = img[i,j,k] +random.gauss(mu=mu,sigma=sigma)
img[img>255] = 255
img[img<0] = 0
img = img.astype(np.uint8)
io.imsave(output_image, img)
return True
else:
return False
if __name__ == '__main__':
a = 1
while a<17:
if a/10 >=1:
in_file=str(a)+'.PNG'
else:
in_file = '0'+str(a) + '.PNG'
in_path = '/Users/crisschan/Downloads/'
ic = ImageChange(in_file,in_path)
print(ic.change_size(120,20,org_proportion=True))
a=a+1
File: /info_hidden2img.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'EMMATools'
#__title__='PyCharm'
#__author__ = 'chancriss'
#__mtime__ = '2018/4/9'
#__instruction__='将信息隐藏到图片中'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
from PIL import Image
import sys
class InfoHidden2Img(object):
def __init__(self,sImg):
self.image = Image.open(sImg)
def makeImageEven(self):
'''
取得一个 PIL 图像并且更改所有值为偶数(使最低有效位为 0)
:return: image对象
'''
if self.image.mode != "RGBA":
self.image = self.image.convert("RGBA")
pixels = list(self.image.getdata()) # 得到一个这样的列表: [(r,g,b,t),(r,g,b,t)...]
evenPixels = [(r >> 1 << 1, g >> 1 << 1, b >> 1 << 1, t >> 1 << 1) for [r, g, b, t] in pixels] # 更改所有值为偶数(魔法般的移位)
evenImage = Image.new(self.image.mode, self.image.size) # 创建一个相同大小的图片副本
evenImage.putdata(evenPixels) # 把上面的像素放入到图片副本
return evenImage
def constLenBin(slef,int):
'''
内置函数 bin() 的替代,返回固定长度的二进制字符串
:param int: 位数标记
:return: 二进制
'''
binary = "0" * (8 - (len(bin(int)) - 2)) + bin(int).replace('0b',
'') # 去掉 bin() 返回的二进制字符串中的 '0b',并在左边补足 '0' 直到字符串长度为 8
return binary
def encodeDataInImage(self, sSecret):
'''
将字符串编码到图片中
:param sSecret: 因此信息到图片
:return:
'''
evenImage = self.makeImageEven() # 获得最低有效位为 0 的图片副本
binary = ''.join(map(self.constLenBin, bytearray(sSecret, 'utf-8'))) # 将需要被隐藏的字符串转换成二进制字符串
if len(binary) > len(self.image.getdata()) * 4: # 如果不可能编码全部数据, 抛出异常
raise Exception("Error: Can't encode more than " + len(evenImage.getdata()) * 4 + " bits in this image. ")
encodedPixels = [(r + int(binary[index * 4 + 0]), g + int(binary[index * 4 + 1]), b + int(binary[index * 4 + 2]),
t + int(binary[index * 4 + 3])) if index * 4 < len(binary) else (r, g, b, t) for
index, (r, g, b, t) in enumerate(list(evenImage.getdata()))] # 将 binary 中的二进制字符串信息编码进像素里
encodedImage = Image.new(evenImage.mode, evenImage.size) # 创建新图片以存放编码后的像素
encodedImage.putdata(encodedPixels) # 添加编码后的数据
return encodedImage
def binaryToString(slef,binary):
'''
从二进制字符串转为 UTF-8 字符串
:param binary: 二进制文件
:return: 字符串
'''
index = 0
string = []
fun = lambda x, i: x[0:8]
while index + 1 < len(binary):
chartype = binary[index:].index('0') # 存放字符所占字节数,一个字节的字符会存为 0
if chartype == 0:
chartype = 1
length = chartype * 8
for i in range(chartype):
ascode = int(binary[index + i * 8:index + i * 8 + 8], 2)
string.append(chr(ascode))
index += length
return ''.join(string)
def decodeImage(self):
'''
解码隐藏数据
:return: 返回隐藏的信息
'''
pixels = list(self.image.getdata()) # 获得像素列表
binary = ''.join([str(int(r >> 1 << 1 != r)) + str(int(g >> 1 << 1 != g)) + str(int(b >> 1 << 1 != b)) + str(
int(t >> 1 << 1 != t)) for (r, g, b, t) in pixels]) # 提取图片中所有最低有效位中的数据
# 找到数据截止处的索引
locationDoubleNull = binary.find('0000000000000000')
endIndex = locationDoubleNull + (
8 - (locationDoubleNull % 8)) if locationDoubleNull % 8 != 0 else locationDoubleNull
data = self.binaryToString(binary[0:endIndex])
return data
if __name__=='__main__':
# infoHidden2Img= InfoHidden2Img('1.jpg')
# infoHidden2Img.encodeDataInImage('I am CrissChan. I want to build the DevOps.I am the king in this area1').save('2.png')
infoHidden2Img2 = InfoHidden2Img('/Users/crisschan/PycharmProjects/emma_tools/1.png')
print(infoHidden2Img2.decodeImage())
File: /init_test_json.py
Content:
#encoding=utf8
#!/usr/bin/env python
# __author_='crisschan'
# __data__='20161130'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# __instruction__生成测试要用的json
from stack import Stack
class InitTestJson(object):
'''
subJsonLoop:将给定的json字符串中某一个子节点或者数组重复后,组成新json返回
主要服务于json的测试用例生成
'''
def __init__(self,strJsonTemplate):
'''
Args:
strJsonTemplate: 要做处理的json原文件
'''
self.strJsonTemplate=str(strJsonTemplate)
#print self.strJsonTemplate
#self.arrNumberlist=arrNumberlist
self.__Parsing()
def __PreFirst(self,nPos,sTarget):
'''
找到字符串中nPos之前的第一次sTarget出现的位置
Args:
nPos: 位置
sTarget: 目标字符
Returns:
'''
while nPos>=0:
nPos = nPos - 1
if self.strJsonTemplate[nPos]==sTarget or self.strJsonTemplate[nPos]=='{':
return nPos
def __Parsing(self):
'''
按照栈的方式分析大括号和中括号的位置
Returns:
'''
self.listbrace=[]#大括号
self.listbrackets=[]#小括号
sCharTemp=Stack()#存大括号和中括号({【)
#sIntTemp=Stack()#位置({)
nPos=0
for aTemp in self.strJsonTemplate:
#print aTemp
if aTemp == '{':
dictTemp = {nPos:'{'}
sCharTemp.push(dictTemp)
elif aTemp=="[":
dictTemp = {nPos: '['}
sCharTemp.push(dictTemp)
elif aTemp==']':
keyTemp = sCharTemp.pop()
if keyTemp.values()[0] == '[':
sPos = keyTemp.keys()[0]
sign=(sPos,nPos+1)
self.listbrackets.append(sign)
#print self.listbrackets
elif aTemp=='}':
keyTemp = sCharTemp.pop()
if keyTemp.values()[0] == '{':
sPos = keyTemp.keys()[0]
sign = (sPos, nPos+1)
self.listbrace.append(sign)
#print self.listbrace
nPos=nPos+1
self.listbrace=sorted(self.listbrace, key=lambda x: x[0])
self.listbrackets = sorted(self.listbrackets, key=lambda x: x[0])
def createTestJson(self,Type,number,count):
'''
Args:
Type: {或者[两中输入
number: type的字符第几次出现
count: 要重复的次数(结果json要重复几次number的位置)
Returns:
'''
if count<=0:
return 0
else:
if Type=='{':
if number <= 0:
return 0
startpos = self.listbrace[number][0]
endpos = self.listbrace[number][1]
prestr=self.strJsonTemplate[0:endpos]
poststr=self.strJsonTemplate[endpos:]
nkey=self.__PreFirst(startpos,',')+1
strRe = prestr
while count-1>0:
strRe=strRe+','+self.strJsonTemplate[nkey:endpos]
count=count -1
strRe=strRe+poststr
return strRe
elif Type=='[':
if number < 0:
return 0
startpos = self.listbrackets[number][0]+1
endpos = self.listbrackets[number][1]-1
prestr = self.strJsonTemplate[0:endpos ]
poststr = self.strJsonTemplate[endpos + 1:]
strRe = prestr
while count-1 > 0:
strRe = strRe +','+ self.strJsonTemplate[startpos:endpos]
count = count - 1
strRe = strRe +']'+ poststr
return strRe
else:
return 0
if __name__=='__main__':
data3 = {'a': 123, 'b': 789, 'c': 456, 'd': {'b': 7897, 'c': 4567, 'a': 1237}}
data1 = {'i':{'a': 123, 'b': 789},'k':{'c': 456, 'd': {'b': 789, 'c': 456, 'a': 123}}}
data2 = {'a': 123, 'b': 789,'c': [{'a': 123, 'b': 789}]}
I1= InitTestJson(data2)
print(I1.createTestJson('[',0,4))
#print str(data1)[0:81]
File: /merge_audio.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : try_audio.py
@Time : 2023/01/16 10:03:00
@Author : CrissChan
@Version : 1.0
@Site : https://blog.csdn.net/crisschan
@Desc : 合并音频文件
'''
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.editor import concatenate_audioclips
class MergeAudio(object):
def __init__(self,audio_list,output_audio='final_audio.mp3') -> None:
'''
@des :构造函数
@params :audio_list,存储音频文件绝对地址的列表
output_audio,默认值为final_audio.mp3的输出文件
@return :none
'''
self.audio_list = audio_list
self.output_audio = output_audio
pass
def merge(self):
'''
@des :合并音频文件
@params :audio_list,存储音频文件绝对地址的列表
@return :none
'''
# 合并音频文件
audio_clip_list =[]
for i in range(len(self.audio_list)):
audio_clip = AudioFileClip(self.audio_list[i])
audio_clip_list.append(audio_clip)
final_clip = concatenate_audioclips(audio_clip_list)
final_clip.write_audiofile(self.output_audio)
if __name__ == '__main__':
alist = ['1.flac','2.flac','3.flac','4.flac','5.flac']
merge_audio = MergeAudio(alist)
merge_audio.merge()
File: /merge_img.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : merge_img.py
@Time : 2024/03/04 13:59:42
@Author : CrissChan
@Version : 1.0
@Site : https://blog.csdn.net/crisschan
@Desc : 将多张图片合并成一个图片
'''
import numpy as np
import cv2
class MergeImg(object):
def __init__(self,img_file_list,output_img_file="output_finale.png") -> None:
'''
@des :构造函数
@params :
img_file_list:存储图片文件绝对地址的列表
output_img:输出文件的地址,默认值为output_finale.png
'''
self.img_file_list = img_file_list
self.output_img_file = output_img_file
def merge(self):
'''
@des :合并img_file_list里面的图片,结果存为output_img_file
@params :无
'''
imgs = []
aimg =""
for aimg in self.img_file_list:
img = cv2.imread(aimg)
imgs.append(img)
merge_img = np.vstack(imgs)
cv2.imwrite(self.output_img_file,merge_img)
if __name__ == '__main__':
alist=['./from/1.png','./from/2.png','./from/3.png']
mi = MergeImg(alist)
mi.merge()
File: /pic2video.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : try_pic_video.py
@Time : 2023/01/16 11:44:30
@Author : CrissChan
@Version : 1.0
@Site : https://blog.csdn.net/crisschan
@Desc : 图片转换成视频
'''
from moviepy.editor import *
from get_dirAndfiles import DirAndFiles
class Pic2Video:
def __init__(self, pics_path, pic_type = '.png', out_file_path='video_tmp.mp4', fps=24, size=(1280, 720),duration = 10):
'''
@des : 初始化函数
@params :pics_path存储png图片的目录
pic_type存储图片的后缀名(默认是PNG)
@params :out_file_path输出视频的路径,默认video_tmp
@params :fps视频的帧率,默认24
@params :size视频的尺寸,默认1280*720
@params :duration视频的时长,默认10秒
@return :none
'''
self.pics_path = pics_path
self.out_file_path = out_file_path
self.fps = fps
self.size = size
self.duration = duration
self.pic_type = pic_type
def pic2video(self):
'''
@des :# 创建视频文件
@params :none
@return : none
'''
df = DirAndFiles()
files_list = df.filesWithFilter(self.pics_path, self.pic_type)
clips = [ImageClip(m).set_duration(self.duration) for m in files_list]
# 获取视频的尺寸
concat_clip = concatenate_videoclips(clips, method="compose")
concat_clip.write_videofile(self.out_file_path, fps=self.fps)
if __name__ == '__main__':
Pic2Video('/Users/crisschan/workspace/PySpace/try/pic',fps=24,size=(1280,720)).pic2video()
File: /qrcode4test.py
Content:
from email import header
from email.mime import image
from ensurepip import version
from importlib import import_module
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : qrcode4test.py
@Time : 2022/09/19 16:13:10
@Author : CrissChan
@Version : 1.0
@Site : https://blog.csdn.net/crisschan
@Desc : 为测试封装的二维码生成、识别的class
'''
import os
from enum import Enum
import qrcode
from PIL import Image
import zxing
class Error_Coorrect(Enum):
'''错误的Enum类'''
# L 最多可以矫正7%的错误
L=qrcode.constants.ERROR_CORRECT_L
# M 最多可以矫正15%的错误,这个是默认值
M=qrcode.constants.ERROR_CORRECT_M
# Q 最多可以矫正25%的错误
Q=qrcode.constants.ERROR_CORRECT_Q
# H 最多可以矫正30%的错误
H=qrcode.constants.ERROR_CORRECT_H
class QRcodeEncode(object):
''' 生成二维码的类,
qr = QRcodeEncode(content='ffffff')
qr.encode_qrcode()
二维码的属性可以通过setter设置'''
def __init__(self,save_path=None,content=None) -> None:
'''
@des : QRcodeEncode的构造函数
@params :save_path 二维码存储地址,如果没有就直接打开
content 需要存储到二维码中的信息
@return :无
'''
self.__save_path=save_path
self.__content = content
self.__version = 2
self.__error_correction = Error_Coorrect.M.value
self.__box_size=8 # 每一个二维码中的box占据的像素
self.__border = 4 # 控制QR Code的空白边距大小,默认值为4
self.__fill_color=(55, 95, 35)# 填充色
self.__back_color = (255, 195, 235) # 背景色
def encode_qrcode(self):
'''
@des :生成二维码
@params :无
@return :在save_path中设置的位置存储突破,如果该参数是None,直接打开二维码图片
'''
encode_qr = qrcode.QRCode(version = self.__version,error_correction=self.__error_correction,box_size=self.__box_size,border=self.__border)
encode_qr.add_data(data = self.__content)
encode_qr.make(fit=True)
img = encode_qr.make_image(fill_color = self.__fill_color,bakc_color = self.__back_color)
if self.__save_path:
img.save(self.__save_path)
else:
img.show()
@property
def version(self):
return self.__version
@version.setter
def version(self,version):
if 40-version<=0:
self.__version = version
else:
print('版本取在1-40之间的整数')
@property
def error_correction(self):
return self.__error_correction
@error_correction.setter
def error_correction(self,error_correction):
if any(error_correction == e.key for e in Error_Coorrect):
self.__error_correction=error_correction
else:
print('error_correction 是 枚举类型Error_Coorrect')
@property
def box_size(self):
return self.__box_size
@box_size.setter
def box_size(self,box_size):
self.__box_size = box_size
@property
def border(self):
return self.__border
@border.setter
def boder(self,boder):
self.__border = boder
@property
def fill_color(self):
return self.__fill_color
@fill_color.setter
def fill_color(self,fill_color):
self.__fill_color = fill_color
@property
def back_color(self):
return self.__back_color
@back_color.setter
def back_color(self,back_color):
self.__back_color = back_color
class QRCodeDecode(object):
def __init__(self,image_path = None ) -> None:
self.__image_path = image_path
pass
def decode_qrcode(self):
if not os.path.exists(self.__image_path):
raise FileExistsError(self.__image_path)
qr = zxing.BarCodeReader()
qrinfo = qr.decode(self.__image_path)
return qrinfo.parsed
# if __name__ == '__main__':
# print(Error_Coorrect.M.value)
# print( qrcode.constants.ERROR_CORRECT_M)
# qr = QRcodeEncode(content='ffffff')
# qr.encode_qrcode()
# qrde=QRCodeDecode('2.png')
# print(qrde.decode_qrcode())
File: /remover.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/5/30 10:23 上午
# @Author : CrissChan
# @Site : https://blog.csdn.net/crisschan
# @File : remove.py
# @Intro : 删除文件或者目录
import os
import shutil
class Remover(object):
@classmethod
def dir(cls,rm_root):
'''
递归删除目录以及目录内的所有内容.
:param root:删除目录
:return:
'''
for root, dirs, files in os.walk(rm_root, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.removedirs(rm_root)
@classmethod
def dir_under(cls,rm_root):
'''
:param rm_root:删除目录下所有的文件,目录不删除存在
:return:
'''
try:
shutil.rmtree(rm_root)
os.makedirs(rm_root)
except:
return
@classmethod
def file(cls,rm_file):
'''
删除文件
:param root:删除文件路径
:return:
'''
if os.path.exists(rm_file):
os.unlink(rm_file)
@classmethod
def dir_empty(cls,rm_root):
'''
递归删除目录,如果有一个目录非空则会抛出异常
:param rm_root: 删除目录
:return:
'''
if os.path.exists(rm_root):
os.removedirs(rm_root)
File: /rotate_img.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'PythonSpace'
#__title__='PyCharm'
#__author__ = 'chancriss'
#__mtime__ = '2017/10/9'
#__instruction__='图片旋转'
from PIL import Image
from numpy import *
class RotateImg(object):
def __init__(self,sOrgImgFile,sTargetImgFile):
'''
Args:
sOrgImgFile: 原始图片地址
sTargetImgFile: 转存图片地址
'''
self.sOrgImgFile=sOrgImgFile
self.sTargetImgFile=sTargetImgFile
def rotate(self,nRotate=0):
'''
按照nRotate旋转
Args:
nRotate: 旋转角度-360 到 360 之间
Returns:
'''
self.nRotate = nRotate
pil_im = Image.open(self.sOrgImgFile)
pil_im = pil_im.rotate(self.nRotate)
sTargetRotate = self.sTargetImgFile[:self.sTargetImgFile.find('.')]+'_rotate'+str(nRotate)+self.sTargetImgFile[self.sTargetImgFile.find('.'):]
pil_im.save(sTargetRotate)
def Counterclockwise90(self):
'''
逆时针旋转90度
Returns:
'''
pil_im = Image.open(self.sOrgImgFile)
dx, dy = pil_im.size
pil_im=pil_im.transpose(Image.ROTATE_90)
pil_im = pil_im.resize((dy, dx))
sTargetRotate = self.sTargetImgFile[:self.sTargetImgFile.find('.')] + '_Counterclockwise90' + self.sTargetImgFile[self.sTargetImgFile.find('.'):]
pil_im.save(sTargetRotate)
def Clockwise90(self):
'''
顺时针旋转90度
Returns:
'''
pil_im = Image.open(self.sOrgImgFile)
dx,dy=pil_im.size
pil_im=pil_im.transpose(Image.ROTATE_270)
pil_im=pil_im.resize((dy,dx))
sTargetRotate = self.sTargetImgFile[:self.sTargetImgFile.find('.')] + '_Clockwise90' + self.sTargetImgFile[self.sTargetImgFile.find('.'):]
pil_im.save(sTargetRotate)
def ToptoBottom(self):
pil_im = Image.open(self.sOrgImgFile)
pil_im=pil_im.transpose(Image.FLIP_TOP_BOTTOM)
sTargetRotate = self.sTargetImgFile[:self.sTargetImgFile.find('.')] + '_ToptoBottom' + self.sTargetImgFile[self.sTargetImgFile.find('.'):]
pil_im.save(sTargetRotate)
def LefttoRight(self):
pil_im = Image.open(self.sOrgImgFile)
pil_im=pil_im.transpose(Image.FLIP_LEFT_RIGHT)
sTargetRotate = self.sTargetImgFile[:self.sTargetImgFile.find('.')] + '_LefttoRight' + self.sTargetImgFile[self.sTargetImgFile.find('.'):]
pil_im.save(sTargetRotate)
# if __name__=='__main__':
# ri = RotateImg('0001.jpg','5.jpg')
# ri.LefttoRight()
# ri.ToptoBottom()
# ri.Clockwise90()
# ri.Counterclockwise90()
# ri.rotate(nRotate=45)
File: /send_email.py
Content:
# !/usr/bin/python
#coding:utf-8
# __author_='crisschan'
# __data__='20160908'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# __instruction__=发送邮件
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
import time
class SendeMail(object):
def __init__(self,sender,receiver,subject,username,password,stmpconnect,transMsg):
'''
:param sender: 发送邮箱
:param receiver: 接收邮箱
:param subject: 邮件标题
:param username: 发送邮箱用户名
:param password: 发送邮箱密码
:param transMsg: 发送正文
:param stmpconnect stmp地址
'''
self.sender = sender
self.receiver = receiver
self.subject = subject
self.username = username
self.password =password
self.transMsg = transMsg
self.stmpconnect = stmpconnect
def _pickupMsgPlain(self):
self.msg = MIMEText(self.transMsg, _subtype='plain', _charset='utf-8') # 中文需参数‘utf-8’,单字节字符不需要
self.msg['Subject'] = Header(self.subject, 'utf-8')
self.msg['from'] = self.sender
self.msg['to'] = self.receiver
def _pickupMsgHTML(self):
self.msg = MIMEText(self.transMsg, _subtype='html', _charset='utf-8') # 中文需参数‘utf-8’,单字节字符不需要
self.msg['Subject'] = Header(self.subject, 'utf-8')
self.msg['from'] = self.sender
self.msg['to'] = self.receiver
def _pickupMsgAtt(self):
self.msg = MIMEMultipart()
for attfile in self.attfiles:
att = MIMEText(open(attfile, 'rb').read(), 'base64', 'gb2312')
att["Content-Type"] = 'application/octet-stream'
att["Content-Disposition"] = 'attachment; filename='+attfile.split('/')[-1] # 这里的filename可以任意写,写什么名字,邮件中显示什么名字
self.msg.attach(att)
self.msg['Subject'] = Header(self.subject, 'utf-8')
self.msg['from'] = self.sender
self.msg['to'] = self.receiver
def send(self,sendMode='plain',attfiles=[]):
'''
:param sendMode: email send mode
plain is text
html is webpage
att is have attfile
:param attfiles: is a list save the attfiles absolute address
:return:
'''
if sendMode=='plain':
self._pickupMsgPlain()
elif sendMode=='HTML':
self._pickupMsgHTML()
elif sendMode=='att':
if len(attfiles)<1:
return 0
else:
self.attfiles = attfiles
self._pickupMsgAtt()
try:
smtp = smtplib.SMTP()
smtp.connect(self.stmpconnect)
smtp.login(self.username, self.password)
smtp.sendmail(self.sender, self.receiver, self.msg.as_string())
smtp.quit()
return 1
except Exception as e:
return e
'''
if __name__=='__main__':
sender = '2053581240@qq.com'
receiver='148065025@qq.com'
subject = 'python email test'
smtpserver = 'smtp.qq.com'
username = '2053581240'
password = ''
stmpconnect = 'smtp.qq.com'
transMsg = '等大大发发'+str(time.time())
attfile = ['/Users/chancriss/Desktop/work/95.pic.jpg','/Users/chancriss/Desktop/work/95.pic.jpg','/Users/chancriss/Desktop/work/95.pic.jpg']
tr = sendemail(sender,receiver,subject,username,password,stmpconnect,transMsg)
print tr.send(sendMode='att',attfiles=attfile)
'''
File: /slipt_img.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__from__ = 'PythonSpace'
#__title__='PyCharm'
#__author__ = 'chancriss'
#__mtime__ = '2017/11/21'
#__instruction__='图片分割'
import os
from PIL import Image
class SplitImg(object):
def __init__(self,src, rownum, colnum, dstpath):
'''
Args:
src: 原图片绝对地址
rownum: 分割行数
colnum: 分割列数
dstpath: 输出目录
'''
img = Image.open(src)
w, h = img.size
if rownum <= h and colnum <= w:
print('Original image info: %sx%s, %s, %s' % (w, h, img.format, img.mode))
print('开始处理图片切割, 请稍候...')
s = os.path.split(src)
if dstpath == '':
dstpath = s[0]
fn = s[1].split('.')
basename = fn[0]
ext = fn[-1]
num = 0
rowheight = h // rownum
colwidth = w // colnum
for r in range(rownum):
for c in range(colnum):
box = (c * colwidth, r * rowheight, (c + 1) * colwidth, (r + 1) * rowheight)
img.crop(box).save(os.path.join(dstpath, basename + '_' + str(num) + '.' + ext), ext)
num = num + 1
print('图片切割完毕,共生成 %s 张小图片。' % num)
else:
print('不合法的行列切割参数!')
# if __name__=="__main__":
# src = '/Users/chancriss/Downloads/1/1.png'
# if os.path.isfile(src):
# dstpath = '/Users/chancriss/Downloads/1/'
# if (dstpath == '') or os.path.exists(dstpath):
# row = 3
# col = 4
# if row > 0 and col > 0:
# SplitImg(src, row, col, dstpath)
# else:
# print('无效的行列切割参数!')
# else:
# print('图片输出目录 %s 不存在!' % dstpath)
# else:
# print('图片文件 %s 不存在!' % src)
File: /stack.py
Content:
#encoding=utf8
#!/usr/bin/env python
# __author_='crisschan'
# __data__='20161130'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# __instruction__=栈的操作
class Stack(object):
def __init__(self):
self.items=[]
def isEmpty(self):
'''
返回是否为空
Returns: true 为空
false 不为空
'''
return len(self.items)==0
def push(self,item):
'''
入栈
Args:
item: 入栈的数据
Returns:null
'''
self.items.append(item)
def pop(self):
'''
出栈
Returns:null
'''
return self.items.pop()
def peek(self):
'''
查看栈顶对象而不移除它
Returns: 栈顶元素
'''
if not self.isEmpty():
return self.items[len(self.items)-1]
def size(self):
'''
计算站大小
Returns:长度
'''
return len(self.items)
'''
if __name__=="__main__":
s=Stack()
print s.isEmpty()
s.push('1')
print s.peek()
s.push(444)
print s.items
print s.size()
print s.pop()
print s.size()
'''
File: /swagger2json.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/8/2
# @Author : CrissChan
# @Site : https://blog.csdn.net/crisschan
# @File : swagger2json.py
# @Porject: 在线的swagger(V2)的api文档的内容保存成离线的json格式文件,并提供版本之间的diff功能
import requests
import json
import re
from os import path
import os
import errno
from enum import Enum
import shutil
class Type(Enum):
# new 新建,第一次使用
# rewrite = 覆盖
new = 1
rewrite = 2
class Swagger2Json(object):
def __init__(self, url, out_path,type=Type.new):
'''
url : swagger 的json路径,类似v2/api-docs
out_path:输出路径
type : Type枚举类型
'''
self.url = url
self.out_path = out_path
if type == Type.new:
self.__new_json_files()
elif type == Type.rewrite:
self.__rewrite_jsonfile()
def __make_dir(self, dir_path=None):
'''
新建目录
'''
if dir_path is None:
dir_path = self.out_path
if not path.exists(dir_path):
try:
os.mkdir(dir_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def __new_json_files(self):
'''
存储全部swagger的json到一个swagger.json文件,并调用单接口的json文件保存接口
'''
if self.__get_swagger_res():
# save swagger information by json file and save the out_path root path
self.__make_dir()
with open(path.join(self.out_path, 'cri.json'), 'w', encoding='utf-8') as f:
json.dump(self.res_json, f, ensure_ascii=False)
self.__get_api_json()
def __get_api_json(self):
'''
保存controller下的api的json到文件,并按照controller结构存储
'''
api_path = path.join(self.out_path, 'api')
self.__make_dir(api_path)
tags = self.res_json['tags'] # tags save all controller name
for tag in tags:
tag_name = tag['name']
tag_dir = path.join(api_path, tag_name)
self.__make_dir(tag_dir)
apis = self.res_json['paths'] # tags save all api uri
for api in apis:
if tag_name in json.dumps(apis[api], ensure_ascii=False):
api_file = path.join(tag_dir, api.replace('/', '_') + '.json')
with open(api_file, 'w', encoding='utf-8') as f:
json.dump(apis[api], f, ensure_ascii=False)
def __get_swagger_res(self):
'''
将swagger的json存储在memery中
'''
is_uri = re.search(r'https?:/{2}\w.+$', self.url)
if is_uri:
try:
res_swagger = requests.get(self.url)
except:
raise Exception('[ERROR] Some error about {}'.format(self.url))
if res_swagger.status_code == 200:
self.res_json = res_swagger.json()
if self.res_json['swagger'] == '2.0':
return True
else:
return False
else:
return False
else:
return False
def __rewrite_jsonfile(self):
'''
覆盖,清空后重新生成
'''
shutil.rmtree(self.out_path, ignore_errors=True)
self.__new_json_files()
if __name__ == '__main__':
url =''
out_patj='./jsonfile/'
sw = Swagger2Json(url,out_patj,type=Type.new)
File: /test_string.py
Content:
# coding=utf8
# !/usr/bin/env python
# __author_='crisschan'
# __data__='20160908'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# __instruction__= 测试需要处理字符串的类:
# 修改了方法添加了@classmethod装饰器
import random
import re
class TestString(object):
def __GetMiddleStr(self,content, startPos, endPos):
'''
:根据开头和结尾字符串获取中间字符串
:param content:原始string
:param startPos: 开始位置
:param endPos: 结束位置
:return: 一个string
'''
# startIndex = content.index(startStr)
# if startIndex >= 0:
# startIndex += len(startStr)
# endIndex = content.index(endStr)
return content[startPos:endPos]
def __Getsubindex(self,content, subStr):
'''
:param content: 原始string
:param subStr: 字符边界
:return: 字符边界出现的第一个字符的在原始string中的位置 []
'''
alist = []
asublen = len(subStr)
sRep = ''
istep = 0
while istep < asublen:
if random.uniform(1, 2) == 1:
sRep = sRep + '~'
else:
sRep = sRep + '^'
istep = istep + 1
apos = content.find(subStr)
while apos >= 0:
alist.append(apos)
content = content.replace(subStr, sRep, 1)
apos = content.find(subStr)
return alist
@classmethod
def GetTestString(cls_obj,content, startStr, endStr):
'''
:param content: 原始string
:param startStr: 开始字符边界
:param endStr: 结束字符边界
:return: 前后边界一致的中间部分字符串 []
'''
reStrList = []
if content is None or content=='':
return reStrList
if startStr!='' and content.find(startStr)<0:
startStr=''
if endStr!='' and content.find(endStr)<0:
endStr=''
if startStr=='':
reStrList.append(content[:content.find(endStr)])
return reStrList
elif endStr=='':
reStrList.append(content[content.find(startStr)+len(startStr):])
return reStrList
elif startStr=='' and endStr=='':
reStrList.append(content)
return reStrList
else:
starttemplist = cls_obj().__Getsubindex(content, startStr)
nStartlen = len(startStr)
startIndexlist = []
for ntemp in starttemplist:
startIndexlist.append(ntemp + nStartlen)
endIndexlist = cls_obj().__Getsubindex(content, endStr)
astep = 0
bstep = 0
dr = re.compile(r'<[^>]+>', re.S)
while astep < len(startIndexlist) and bstep < len(endIndexlist):
while startIndexlist[astep] >= endIndexlist[bstep]:
bstep = bstep + 1
strTemp = cls_obj().__GetMiddleStr(content, startIndexlist[astep], endIndexlist[bstep])
strTemp = dr.sub('', strTemp)
reStrList.append(strTemp)
astep = astep + 1
bstep = bstep + 1
return reStrList
# if __name__=="__main__":
# strgg = '24214jnjkanrhquihrghjw<>eufhuin/jfghs<>ajfjsanfghjkg/hjkghj<>kghjfasd/sdaf'
# print(TestString.GetTestString(strgg,'<a href="','/'))
File: /timer.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/8/14 9:11 AM
# @Author : Criss Chan
# @Site : https://blog.csdn.net/crisschan
# @File : timer.py
# @Software: PyCharm
# @instruction:计算耗时的装饰器封装
import time
# 这是装饰函数
class Timer(object):
def timer(func):
def wrapper(*args, **kw):
t1=time.time()
# 这是函数真正执行的地方
func(*args, **kw)
t2=time.time()
# 计算下时长
cost_time = t2-t1
print("花费时间:{}秒".format(cost_time))
return wrapper
#使用举例子
if __name__ == '__main__':
@Timer.timer
def want_sleep(sleep_time):
time.sleep(sleep_time)
want_sleep(10)
File: /ureboot_processer.py
Content:
#!/usr/bin/env python
# coding=utf8
# __author_='crisschan'
# __data__='20160908'
# __from__='EmmaTools https://github.com/crisschan/EMMATools'
# __instruction__=修改脚本后不用重启服务,调用watchdog不用启动识别修改
import os, sys, time, subprocess
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
def log(s):
print('[Monitor] %s' % s)
class MyFileSystemEventHander(FileSystemEventHandler):
def __init__(self, fn):
super(MyFileSystemEventHander, self).__init__()
self.restart = fn
def on_any_event(self, event):
if event.src_path.endswith('.py'):
log('Python source file changed: %s' % event.src_path)
self.restart()
command = ['echo', 'ok']
process = None
def kill_process():
global process
if process:
log('Kill process [%s]...' % process.pid)
process.kill()
process.wait()
log('Process ended with code %s.' % process.returncode)
process = None
def start_process():
global process, command
log('Start process %s...' % ' '.join(command))
process = subprocess.Popen(command, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr)
def restart_process():
kill_process()
start_process()
def start_watch(path, callback):
observer = Observer()
observer.schedule(MyFileSystemEventHander(restart_process), path, recursive=True)
observer.start()
log('Watching directory %s...' % path)
start_process()
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
argv = sys.argv[1:]
if not argv:
print('Usage: ./UrebootProcesser xxxx.py')
exit(0)
if argv[0] != 'python':
argv.insert(0, 'python')
command = argv
path = os.path.abspath('.')
print(path)
start_watch(path, None)
File: /video_change.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/5/17 8:59 AM
# @Author : Criss Chan
# @Site :
# @File : video_change_proprity.py
# @Software: PyCharm
# @instruction: 视频修改码率的,为创建各种不同测试使用的视频文件做准备
from pydub import AudioSegment
class VideoChange(object):
def __init__(self,video_path):
self.__video_path = video_path
self.change_frame_rate()
def change_frame_rate(self):
'''
z转换码率(到喜马拉雅可以识别)
:return: null
'''
# 读取音频文件,设置采样率<default=44100>
song = AudioSegment.from_mp3(self.__video_path).set_frame_rate(128050)
# 按32k的bitrate导出文件到指定路径,这里是直接覆盖原文件
song.export(self.__video_path, format='mp3', bitrate='64k')
if __name__ == '__main__':
VideoChange('奈飞持续交付实践探寻上.mp3')
VideoChange('奈飞持续交付实践探寻下.mp3')
File: /video_merge_audio.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@File : video_merge_audio.py
@Time : 2022/12/27 15:36:57
@Author : CrissChan
@Version : 1.0
@Site : https://blog.csdn.net/crisschan
@Desc : 利用moviepy库,给视频添加一个背景音乐
'''
from moviepy.editor import *
class VideoMergeAudio(object):
def __init__(self,video_file,audio_file,out_file_path='video_merge_audio.mp4',duration_flag = 0) -> None:
'''
@des :构造函数
@params :
video_file视频的绝对地址和文件名
audio_file音频的绝对地址和文件名
out_file_path 输出文件名 默认video_merge_audio.mp4
dration_flag如果是0,那么最终视频尝试以video_file长度为准
如果是1,那么最终视频尝试以audio_file长度为准
@return : None
'''
self.video_file = video_file
self.audio_file = audio_file
self.duration_flag = duration_flag
self.out_file_path=out_file_path
pass
def merge(self):
'''
@des :合并视频和音频
@params :
video_file视频的绝对地址和文件名
audio_file音频的绝对地址和文件名
duration_flag如果是0,那么最终视频尝试以video_file长度为准
如果是1,那么最终视频尝试以audio_file长度为准
@return : 返回合并后的视频文件的绝对地址
'''
video_clip = VideoFileClip(self.video_file)
audio_clip = AudioFileClip(self.audio_file)
if self.duration_flag == 0:
final_clip = video_clip.set_audio(audio_clip).set_duration(video_clip.duration)
else:
final_clip = video_clip.set_audio(audio_clip).set_duration(audio_clip.duration)
final_clip.write_videofile(self.out_file_path)
if __name__ == '__main__':
vma = VideoMergeAudio('1.mp4','1.mp3',duration_flag=1)
vma.merge()
File: /work_webchat_robot.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/9/18 2:35 PM
# @Author : Criss Chan
# @Site : https://blog.csdn.net/crisschan
# @File : work_webchat.py
# @Software: PyCharm
# @instruction:使用企业微信的机器人群发消息,在企业微信群里面,选择添加一个机器人,然后复制对应的webhook地址
# 每个机器人发送的消息不能超过20条/分钟。
from enum import Enum
import requests
import json
class MSGTYPE(Enum):
TEXT = 'text' # 文本类型。对应消息的限制:文本内容,最长不超过2048个字节,必须是utf8编码
MARKDOWN = 'markdown' # markdown类型,对应消息内容限制:markdown内容,最长不超过4096个字节,必须是utf8编码
IMAGE = 'image' # 图片,对应图片的base64编码限制:图片(base64编码前)最大不能超过2M,支持JPG,PNG格式
NEWS = 'news' # 图文消息,限制:图文消息,一个图文消息支持1到8条图文
# title 是标题,不超过128个字节,超过会自动截断
# description,描述不超过512个字节,超过会自动截断
# url,点击后跳转的链接。
# picurl图文消息的图片链接,支持JPG、PNG格式,较好的效果为大图 1068*455,小图150*150。
class workWechat(object):
def __init__(self, webhook, msgtype, dictdetail, mentioned_mobile_list=''):
'''
:param webhook: 机器人的webhook地址
:param msgtype: 消息类型,具体参见MSGTYPE枚举类型
:param dictdetail: 消息主体内容
:param mentioned_mobile_list: 手机号列表,提醒手机号对应的群成员(@某个成员),@all表示提醒所有人(该参数只在消息类型为text的时候有效
'''
self.webhook = webhook
self.msgtype = msgtype
self.mentioned_mobile_list = mentioned_mobile_list
self.detail = dictdetail
self._send_msg()
def _send_msg(self):
headers = {'Content-Type': 'application/json'}
payload = self._payload()
res = requests.post(self.webhook, data=payload.encode('utf-8'), headers=headers)
#{"errcode":0,"errmsg":"ok"}
if json.loads(res.text)['errmsg']=='ok':
print('scuccess')
else:
print('faile')
def _payload(self):
payload = ''
mentioned_mobile = ''
try:
if len(self.mentioned_mobile_list) > 0:
mentioned_mobile = '","'.join(self.mentioned_mobile_list)
mentioned_mobile = '"mentioned_mobile_list":["' + mentioned_mobile + '"],'
except:
mentioned_mobile = ''
if self.msgtype is MSGTYPE.NEWS:
payload = '{"msgtype": "' + self.msgtype.value + '","' + self.msgtype.value + '": { ' + mentioned_mobile + ' "articles" : [' + self.detail + ']}}'
elif self.msgtype is MSGTYPE.MARKDOWN:
payload = '{"msgtype": "' + self.msgtype.value + '","' + self.msgtype.value + '": { ' + mentioned_mobile + '"content":"' + self.detail + '"}}'
elif self.msgtype is MSGTYPE.TEXT:
payload = '{"msgtype": "' + self.msgtype.value + '","' + self.msgtype.value + '": { ' + mentioned_mobile + '"content":"' + self.detail + '"}}'
elif self.msgtype is MSGTYPE.IMAGE:
payload = '{"msgtype": "' + self.msgtype.value + '","' + self.msgtype.value + '": { ' + mentioned_mobile + '"base64":' + self.detail + ',"md5":"MD5"}'
else:
payload = ''
return payload
# if __name__ == '__main__':
# wwc = workWechat('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=',
# MSGTYPE.MARKDOWN,
# '实时新增用户反馈<font color=\\\"warning\\\">132例</font>,请相关同事注意。\n>类型:<font color=\\\"comment\\\">用户反馈</font> \n>普通用户反馈:<font color=\\\"comment\\\">117例</font> \n >VIP用户反馈:<font color=\\\"comment\\\">15例</font>',)
# wwc = workWechat('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=',
# MSGTYPE.TEXT, ' "实时新增用户反馈,请注意')
# wwc = workWechat('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=',
# MSGTYPE.IMAGE, ' "')
# aritcle = '''{
# "title" : "就是你",
# "description" : "gitflow",
# "url" : "https://blog.csdn.net/crisschan/article/details/100922668",
# "picurl" : "https://i.loli.net/2019/09/17/4wPgvOm72Q9zT8K.png"
# }'''
# wwc = workWechat('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=',
# MSGTYPE.NEWS, aritcle)
File: /zip_memless.py
Content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/5/20 1:56 下午
# @Author : CrissChan
# @Site : https://blog.csdn.net/crisschan
# @File : zip_memless.py
# @Intro :低内存压缩
import zipfly
class ZipMemless(object):
def __init__(self,paths,out_zip):
'''
:param paths: # paths = [{
'fs': 'home/user/Videos/jupiter.mp4',被压缩文件物理位置1
'n': 'movies/jupiter.mp4'在压缩包中的全路径1
},{
'fs': 'home/user/Documents/mercury.mp4',被压缩文件物理位置2
'n': 'movies/mercury.mp4'在压缩包中的全路径2
},]
:param out_zip: 输出文件.zip的全路径
'''
self.paths=paths
self.out_zip = out_zip
self.__zip()
def __zip(self):
zfly = zipfly.ZipFly(paths=self.paths)
generator = zfly.generator()
with open(self.out_zip, "wb") as f:
for i in generator:
f.write(i)
# if __name__ == '__main__':
# paths = [{
# 'fs': 'home/user/Videos/jupiter.mp4',
# 'n': 'movies/jupiter.mp4'
# },]
# out_file = '/Users/crisschan/PySpace/1.zip'
# ZipMemless(paths,out_file)
File: /.vscode/launch.json
Content:
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}