前言

本学期有一门音视频处理的课程,虽然老师上课上得确实过于粗糙了,但是确实激起了我的兴趣,课后在不断解决自己的疑惑的过程中还学到了很多有用但冷门的知识。日后应该也会写一篇笔记来记录学习的过程。

本次实践主题为G711 A律编码,是一种在通信领域应用广泛的编解码标准之一,是非均匀PCM编码的一种,具有良好的压缩效果和兼容性,因此广泛运用在通信领域,且主要在欧洲和亚洲地区广泛运用,我国也广泛采用此编码标准。

G711A律编码原理

非均匀PCM

对于输入的音频信号,每个时刻的信号幅值是完全不同的,最为直观的感受就是在音频播放器中,播放音乐时打开示波器,可以看到音频信号幅值随着播放起起伏伏。变化速度快,且幅值变化没有规律,此时采用均匀PCM编码处理,将输入信号的幅度范围进行等间隔划分的量化,会导致高频部分失真,也无法捕捉到音频信号频繁的动态变化。

音频波形

这时候就可以考虑非均匀变换,根据不同的幅值情况采用非均匀的间隔量化,可以在达到压缩目的的同时获得更高的精度,尽可能不让关键信息丢失。G711A律编码就是非均匀PCM编码的一种。

对于G711 A律编码,也分为连续信号与离散信号的两种情况。

连续信号

对于连续的信号,A律编码采用了两段近似的方法,其表达式为:

其中,A为峰值幅度。A0为控制压缩度的常数,常取A0 = 87.6。当 |x| ≤ (A/A0),我们将其视为幅值小的信号,当 A ≥ |x| ≥ (A/A0)时,我们将其视为幅值大的信号。

根据表达式,可以得到如下的函数图像:

连续G711A律编码

从上图我们可以看出:

  • 对于幅值大的信号,幅值越高,变化得越平缓,压缩精度较高;

  • 对于幅值小的信号,幅值越高,变化得越陡峭,压缩精度较低;

有了编码压缩的过程,那自然就有解码的过程,也就是对编码函数取逆函数,其表达式如下:

离散信号

使用连续的方法对信号进行G711A律编码显然计算量很大,而且对于计算机中的音频数据,读取后都是离散的信号集合,因此我们需要一个迫近的连续处理图像的离散函数,此时就有了接下来的13段折线压扩,也就是离散化的方式,此过程也被称为量化。

13段折现法的量化方式为:

  1. 将输入信号的幅度范围归一化为(-1,1);

  2. 对x正半轴的区间(0,1),以1/2递减规律将该区间不均匀地分成8段:

    以(0,1)的中点1/2为界将该区间等分两段;

    以(0,1/2)的中点1/4为界将该区间等分两段;

    以(0,1/4)的中点1/8为界将该区间等分两段;

    以(0,1/8)的中点1/16为界将该区间等分两段;

    以(0,1/16)的中点1/32为界将该区间等分两段;

    以(0,1/32)的中点1/64为界将该区间等分两段;

    以 (0,1/64)的中点1/128为界将该区间等分两段,共计8段;

  3. 对于上述的8段区间,每个区间再均匀分成16段;那么,最终整个(0,1)区间被分成128段。

  4. 对x负半轴(-1,0)区间,采用上述 2,3的操作将该区间分成128段;这样,最终x轴的(-1,1)区间被分成256个区间。

  5. 将y轴(-1,1)区间均匀分成256段 ,共256个量化级。

  6. 把x轴和y轴的相应交点连接起来,得到13折线(其中正负轴的第一段和第二段折线斜率相同,合并为一条线段)。

按照步骤可以绘制出以下函数图像:

可以看出,与连续信号编码函数图像十分接近。

解码过程就是编码过程的反操作,因此不再赘述。

代码实现

一般来说,G711 A律编码的输入音频信号的位深为16bit,编码后的位深为8bit,因此接下来都以16bit的音频作为例子。

其中,测试用的音频数据我使用的是如下三段音频:

  • 单声道:wav5.wav
  • 双声道:wav6.wav,youjianchuiyan.wav

使用audioop库进行实现

在Python中,可以直接使用第三方库audioop直接进行G711 A律的编解码操作。在编解码操作之前,需要打开和读取音频,可以使用Python内置的wave库进行读取。

  • 打开音频:wav_in = wave.open(文件路径,"rb")“rb”是以2进制进行读取。

  • 获取音频信息:nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()

    参数 翻译 含义
    nchannels 声道数 1为单声道,2为双声道
    sampwidth 采样位宽 每个采样点用多少位,常见为16bit
    framerate 比特率 每秒所包含的数据大小
    nframes 总采样点 离散信号的总采样点(x的个数)
    comptype 压缩类型 指音频采用的压缩标准,如“MPEG”,如没有则显示“None”
    compname 压缩方法 压缩算法或压缩格式,如未压缩显示“not compressed”
  • 读取音频:wav_in.readframes(nframes)

  • G711 A律编码:audioop.lin2alaw(音频数据,sampwidth)

    • sampwidth = 1 -> 8 bit的采样位宽,在此过程中就是编码后音频的位宽。
    • sampwidth = 2 -> 16 bit的采样位宽,在此过程中就是输入音频与解码音频的采样位宽。
  • 保存音频:

    1. 设置保存音频的参数:nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()
    2. 写入:wav_out.writeframes(audio_data),其中audio_data为bit数据流。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import wave
import audioop


def encode_G711(input_path):
# 打开 WAV 文件
with wave.open(input_path, 'rb') as wav_in:
# 获取采样参数
nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()

# 读取所有采样值
frames = wav_in.readframes(nframes)

# 对采样值进行 A 律压缩编码
compressed_data = audioop.lin2alaw(frames, sampwidth)

# 创建新的 WAV 文件并写入压缩后的数据
with wave.open('./output/encode.wav', 'wb') as wav_out:
# 设置参数
wav_out.setparams((nchannels, sampwidth, framerate, nframes, comptype, compname))
# 写入
wav_out.writeframes(compressed_data)
print('G.711 A-law encoding is done')

def encode_G711(input_path):
# 打开 WAV 文件
with wave.open(input_path, 'rb') as wav_in:
# 获取采样参数
nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()

# 读取所有采样值
frames = wav_in.readframes(nframes)

# 对采样值进行 A 律压缩解码
compressed_data = audioop.lin2alaw(frames, sampwidth)

# 创建新的 WAV 文件并写入压缩后的数据
with wave.open('./output/encode.wav', 'wb') as wav_out:
# 设置参数
wav_out.setparams((nchannels, sampwidth, framerate, nframes, comptype, compname))
# 写入
wav_out.writeframes(compressed_data)
print('G.711 A-law encoding is done')


if __name__ == '__main__':
input_path = "./input/wav6.wav"
encode_G711(input_path)
input_path = "./output/encode.wav"
decode_G711(input_path)

因为处理的结果与自行编程实现的处理结果一样,因此我只展示自行编程实现的过程,处理结果在[量化——编码](# 量化——编码)与[量化——解码](# 量化——解码)部分有展示。

原理编程实现

根据G711 A律编码的原理,要实现编码需要经过采样、量化、编码三个过程。

  • 采样:将连续的音频数据进行离散化,此步骤还需要将音频bit流转化为整数。
  • 量化:将离散化的数据按照13折现法的量化范围映射到一个固定数量的量化级别上。
  • 编码:将整数转化为bit流,并将其转换为一个8位或者16位二进制数据。

采样

audioop库中的linear2alawalaw2linear函数,采样与转化bit流的过程并没有体现。所以,不能像调用audioop库一样简单使用wave读取音频数据,将bit流转化为整数的过程也需要单独实现,且需要根据不同的音频格式采取不同的转化策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import wave
import numpy as np

def read_audio(input_path):
"""
读取音频数据
:param input_path: 音频路径
:return: 读取出的音频数据 -> list
"""
with wave.open(input_path, 'rb') as wav_in:
# 获取音频格式信息
nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()
print("音频格式信息:")
print(f"声道数:{nchannels},样本大小:{sampwidth} ,采样率:{framerate} ,采样点数:{nframes},"
f"压缩类型:{comptype},compname:{compname}")

# 读取所有采样值
frames = wav_in.readframes(nframes)

# 根据不同的sampwidth采取不同的读取策略
if sampwidth == 1:
# 对于采样深度为1的音频,采取一个一个字节读入
audio_data = np.frombuffer(frames, dtype=np.int8).reshape(-1, nchannels).copy()
elif sampwidth == 2:
# 对于采样深度为2的音频,采取两个两个字节读入
audio_data = np.frombuffer(frames, dtype=np.int16).reshape(-1, nchannels).copy()

# print(audio_data.shape)
# 如果音频为双声道,需要将其分为左右声道
if nchannels == 2:
left_channel = audio_data[:, 0]
right_channel = audio_data[:, 1]
# 将左右声道数据装入一个list中
left_right_data = [left_channel, right_channel]
else:
left_right_data = [audio_data[:, 0]]
return left_right_data, nchannels, sampwidth, framerate, nframes

对于不同的sampwidth,使用np.frombuffer读取时,读取后的返回值为整数列表,对于不同的采样宽度:

  • sampwidth = 1 时,dtype=np.int8采取一个一个字节读入的方式
  • sampwidth = 2 时,dtype=np.int16采取两个两个字节读入的方式

当输入音频的声道数为2时,不能直接像单声道那样直接处理,而是应该将左右声道分开处理,后续对其分别进行编码与解码的操作。reshape(-1, nchannels)是为了方便将采样数据分离。

  • nchannels = 2 时,将采样数据分为audio_data[:, 0]audio_data[:, 1]
  • nchannels = 1 时,不进行分开处理,但为了格式与nchannels = 2保持一致,还是使用[audio_data[:, 0]]作为最后的读取结果。

量化——编码

对转化为整数的数据列表,根据13折现法进行编码量化,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from alive_progress import alive_bar

# 符号位掩码
SIGN_BIT = 128
# 量化位掩码
QUANT_MASK = 15
# 段偏移码
SEG_SHIFT = 4
# 段掩码
SEG_MASK = 112

# 定义编码的划分点
seg_aend = [31, 63, 127, 255, 511, 1023, 2047, 4095]

def linear2alaw(audio_data):
"""
对音频数据进行G711A律编码
:param audio_data: 待编码音频数据
:return: 编码后的音频数据 -> list
"""
# 用于储存编码结果
result = list()
print("---G711 a-law encoding begins---")
# 使用进度条方便查看处理进度
with alive_bar(len(audio_data), force_tty=True) as bar:
for pcm_val in audio_data:
# 右移 3 位相当于除以 8
pcm_val = pcm_val >> 3

# 设置掩码值
if pcm_val >= 0:
mask = 213
else:
mask = 85
pcm_val = -pcm_val - 1
# 确定pcm_val所属的段
seg = search(pcm_val, seg_aend, 8)

# 超出预期线性段范围
if seg >= 8:
result.append(127 ^ mask)
else:
# 计算编码值的高 3 位
aval = seg << SEG_SHIFT
if seg < 2:
aval |= (pcm_val >> 1) & QUANT_MASK
else:
aval |= (pcm_val >> seg) & QUANT_MASK
# 计算完整 A-law 编码值并添加到result中
result.append(aval ^ mask)
bar()
return result

量化——解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def alaw2linear(encode_data):
"""
G711 a律解码
:param encode_data: 经过编码的音频数据
:return: 解码后的音频数据 -> list
"""
# 用于储存解码结果
result = list()
print("---G711 a-law decoding begins---")
# 使用进度条方便查看处理进度
with alive_bar(len(encode_data), force_tty=True) as bar:
for a_val in encode_data:
# 将正负分离
a_val ^= 85
# 计算 t 的低 12 位
t = (a_val & QUANT_MASK) << 4
# 计算线性段索引
seg = (a_val & SEG_MASK) >> SEG_SHIFT

# 计算解码值
if seg == 0:
t += 8
elif seg == 1:
t += 264
else:
t += 264
t <<= seg - 1
# 确定符号并将结果加入result
if a_val & SIGN_BIT:
result.append(t)
else:
result.append(-t)
bar()
return result

编码

因为在read_audio函数(音频读取部分)我们对nchannels = 2的音频进行了分开处理,所以我们保存音频的部分需要将分开的后的音频数据合并,这边我使用np.column_stack将分离后的数据合并。

对于nchannels = 1的音频数据,因为在read_audio函数中保持了相同的格式,所以另一个声道数据为[]`。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io
import wave
import struct
from itertools import chain
from alive_progress import alive_bar

def save_audio(output_path, data, nchannels, sampwidth, framerate, nframes, processbar=1):
# 将所有通道的数据组合
recovered_audio_data = np.column_stack(tuple(data))
data = recovered_audio_data

# print(sampwidth)
# 获取进度条的总进度
if nchannels == 2:
processbar = 2

# 将整数数组转换为字节流
byte_stream = io.BytesIO()
print("---Audio is being synthesized---")
if sampwidth == 1:
with alive_bar(len(data) * processbar, force_tty=True) as bar:
for sample in chain.from_iterable(data):
# 'B':无符号整数,将整数转换为一个长度为1字节的字符串
byte_stream.write(struct.pack('B', sample))
bar()
elif sampwidth == 2:
with alive_bar(len(data) * processbar, force_tty=True) as bar:
for sample in chain.from_iterable(data):
# 'h' 有符号短整型,将整数转换为一个长度为2字节的字符串
byte_stream.write(struct.pack('h', int(sample)))
bar()

# 将字节流写入
with wave.open(output_path, 'wb') as wav_out:
# 设置音频参数
wav_out.setnchannels(nchannels)
wav_out.setsampwidth(sampwidth)
wav_out.setframerate(framerate)
wav_out.setnframes(nframes)
# 按照参数将字节流写入
wav_out.writeframes(byte_stream.getvalue())
if sampwidth == 1:
print('---G.711 A-law encoding is done---')
elif sampwidth == 2:
print("---G.711 A-law decoding is done---")
print(f"The audio is saved in '{output_path}'")
print("-" * 100)

对于整数数组转换为字节流的过程中,对于不同的sampwidth,我们需要采取不同二进制打包策略,不然无法正确输出音频文件。

  • sampwidth = 1 时,数据经过linear2alaw函数的编码,因此我们需要将整数转换为一个长度为1字节的字符串,使用'B':无符号整数的打包格式
  • sampwidth = 1 时,数据经过alaw2linear函数的解码,因此我们需要将整数转换为一个长度为2字节的字符串,使用'h':有符号短整型的打包格式

在最后使用wav进行音频文件的输出时,根据目的不同使用不同的sampwidth 取值。

  • 编码后的数据,采样位深一般由 2 → 1。
  • 解码后的数据,采样位深一般由 1 → 2。

测试

由于在read_audio函数中将左右声道数据分开后打包装入left_right_data列表中,所以需要使用循环将其取出。处理后将其打包装入data列表中,方便后续save_audio函数处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def test_g711(audio_data, type=1):
"""
对音频进行编码与解码操作
:param audio_data:待处理的音频数据
:param type:处理类型:1-编码 2-解码
:return:处理后的数据 -> list
"""
data = []
# 读取所有通道的音频信息
for channel in audio_data:
# 编码操作
if type == 1:
encoded_channel = linear2alaw(channel)
data.append(encoded_channel)
# 解码操作
else:
decoded_channel = alaw2linear(channel)
data.append(decoded_channel)
return data
G711A律编码结果

测试对象我选择了wav5.wavwav6.wav作为处理对象,分别对应音频单声道与双声道的情况。下面只展示linear2alaw函数处理的结果。

结果及分析:

  • wav5.wav

    程序输出结果:

    image-20230505164211005

    使用ffprobe -i wav5_encode.wav -show_format命令查看格式信息:

    image-20230505164602673
  • wav6.wav

    程序输出结果:

    image-20230505164329917

    使用ffprobe -i wav6_encode.wav -show_format命令查看格式信息:

image-20230505164758748

从格式上可以看出,sizebit_rate都相比原音频减少了约1/2。其他参数并没有明显变化。播放编码后的音频wav5_encode.wavwav6_encode.wav可以听出有非常明显的杂音,类似于老电视机的沙沙声,但是人声得以保留,有一部分语音确实有些损坏,只有仔细才能听出说话的内容。

G711A律解码结果

测试对象使用编码后的wav5_encode.wavwav6_encode.wav,分别对应音频单声道与双声道的情况。下面只展示alaw2linear函数处理的结果,使用audioop库的处理结果相同。

结果及分析:

  • wav5_encode.wav

    image-20230505165948766

    使用ffprobe -i wav5_decode.wav -show_format命令查看格式信息:

    image-20230505170215701
  • wav6_encode.wav

    image-20230505170017115

    使用ffprobe -i wav6_decode.wav -show_format命令查看格式信息:

    image-20230505170339724

从格式上可以看出wav5_decode.wavwav6_decode.wav sizebit_rate分别是wav5_encode.wavwav6_encode.wav的两倍,也就是这两项指数回到了原音频的水准。

使用播放器播放wav5_decode.wavwav6_decode.wav,与原音频wav5.wavwav6.wav听起来几乎没有差别。

效率与进度条

如果输出的是老师准备的youjianchuiyan.wav(时长:3分15秒),那么处理时长会有点长(2分半左右)。因为运行效率不高,所以为了更加直观查看到处理的过程,我采用alive_bar库对编码、解码、转换为字节流的部分添加上了进度条,这在编解码与结果展示部分也有体现。

image-20230505190307758

这是因为编码函数linear2alaw与解码函数alaw2linear使用的是for循环对值进行一个个的处理,因此处理效率较为低下。但是编码与解码的过程可以完美的实现,且可以满足大部分音频的G711A律压扩的要求。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import io
import wave
import struct
import numpy as np
from itertools import chain
from alive_progress import alive_bar

# 符号位掩码
SIGN_BIT = 128
# 量化位掩码
QUANT_MASK = 15
# 段偏移码
SEG_SHIFT = 4
# 段掩码
SEG_MASK = 112

# 定义编码的划分点
seg_aend = [31, 63, 127, 255, 511, 1023, 2047, 4095]


def search(val, array, size):
"""
在给定数组中查找 val小于等于的元素,并返回索引
:param val: 输入值
:param array: 划分点列表
:param size: 线性段的数目
:return: 返回线性段索引
"""
for i in range(size):
if val <= array[i]:
return i
return size


def linear2alaw(audio_data):
"""
对音频数据进行G711A律编码
:param audio_data: 待编码音频数据
:return: 编码后的音频数据 -> list
"""
# 用于储存编码结果
result = list()
print("---G711 a-law encoding begins---")
# 使用进度条方便查看处理进度
with alive_bar(len(audio_data), force_tty=True) as bar:
for pcm_val in audio_data:
# 右移 3 位相当于除以 8
pcm_val = pcm_val >> 3

# 设置掩码值
if pcm_val >= 0:
mask = 213
else:
mask = 85
pcm_val = -pcm_val - 1
# 确定pcm_val所属的段
seg = search(pcm_val, seg_aend, 8)

# 超出预期线性段范围
if seg >= 8:
result.append(127 ^ mask)
else:
# 计算编码值的高 3 位
aval = seg << SEG_SHIFT
if seg < 2:
aval |= (pcm_val >> 1) & QUANT_MASK
else:
aval |= (pcm_val >> seg) & QUANT_MASK
# 计算完整 A-law 编码值并添加到result中
result.append(aval ^ mask)
bar()
return result


def alaw2linear(encode_data):
"""
G711 a律解码
:param encode_data: 经过编码的音频数据
:return: 解码后的音频数据 -> list
"""
# 用于储存解码结果
result = list()
print("---G711 a-law decoding begins---")
# 使用进度条方便查看处理进度
with alive_bar(len(encode_data), force_tty=True) as bar:
for a_val in encode_data:
# 将正负分离
a_val ^= 85
# 计算 t 的低 12 位
t = (a_val & QUANT_MASK) << 4
# 计算线性段索引
seg = (a_val & SEG_MASK) >> SEG_SHIFT

# 计算解码值
if seg == 0:
t += 8
elif seg == 1:
t += 264
else:
t += 264
t <<= seg - 1
# 确定符号并将结果加入result
if a_val & SIGN_BIT:
result.append(t)
else:
result.append(-t)
bar()
return result


def read_audio(input_path):
"""
读取音频数据
:param input_path: 音频路径
:return: 读取出的音频数据 -> list
"""
with wave.open(input_path, 'rb') as wav_in:
# 获取音频格式信息
nchannels, sampwidth, framerate, nframes, comptype, compname = wav_in.getparams()
print("音频格式信息:")
print(f"声道数:{nchannels},样本大小:{sampwidth} ,采样率:{framerate} ,采样点数:{nframes},"
f"压缩类型:{comptype},compname:{compname}")

# 读取所有采样值
frames = wav_in.readframes(nframes)

# 根据不同的sampwidth采取不同的读取策略
if sampwidth == 1:
# 对于采样深度为1的音频,采取一个一个字节读入
audio_data = np.frombuffer(frames, dtype=np.int8).reshape(-1, nchannels).copy()
elif sampwidth == 2:
# 对于采样深度为2的音频,采取两个两个字节读入
audio_data = np.frombuffer(frames, dtype=np.int16).reshape(-1, nchannels).copy()

# print(audio_data.shape)
# 如果音频为双声道,需要将其分为左右声道
if nchannels == 2:
left_channel = audio_data[:, 0]
right_channel = audio_data[:, 1]
# 将左右声道数据装入一个list中
left_right_data = [left_channel, right_channel]
else:
left_right_data = [audio_data[:, 0]]
return left_right_data, nchannels, sampwidth, framerate, nframes


def save_audio(output_path, data, nchannels, sampwidth, framerate, nframes, processbar=1):
# 将所有通道的数据组合
recovered_audio_data = np.column_stack(tuple(data))
data = recovered_audio_data

# print(sampwidth)
# 获取进度条的总进度
if nchannels == 2:
processbar = 2

# 将元组转换为字节流
byte_stream = io.BytesIO()
print("---Audio is being synthesized---")
if sampwidth == 1:
with alive_bar(len(data) * processbar, force_tty=True) as bar:
for sample in chain.from_iterable(data):
# 'B':无符号整数,将整数转换为一个长度为1字节的字符串
byte_stream.write(struct.pack('B', sample))
bar()
elif sampwidth == 2:
with alive_bar(len(data) * processbar, force_tty=True) as bar:
for sample in chain.from_iterable(data):
# 'h' 表示有符号短整型,将整数转换为一个长度为2字节的字符串
byte_stream.write(struct.pack('h', int(sample)))
bar()

# 将字节流写入
with wave.open(output_path, 'wb') as wav_out:
# 设置音频参数
wav_out.setnchannels(nchannels)
wav_out.setsampwidth(sampwidth)
wav_out.setframerate(framerate)
wav_out.setnframes(nframes)
# 按照参数将字节流写入
wav_out.writeframes(byte_stream.getvalue())
if sampwidth == 1:
print('---G.711 A-law encoding is done---')
elif sampwidth == 2:
print("---G.711 A-law decoding is done---")
print(f"The audio is saved in '{output_path}'")
print("-" * 100)


def test_g711(audio_data, type=1):
"""
对音频进行编码与解码操作
:param audio_data:待处理的音频数据
:param type:处理类型:1-编码 2-解码
:return:处理后的数据 -> list
"""
data = []
# 读取所有通道的音频信息
for channel in audio_data:
# 编码操作
if type == 1:
encoded_channel = linear2alaw(channel)
data.append(encoded_channel)
# 解码操作
else:
decoded_channel = alaw2linear(channel)
data.append(decoded_channel)
return data


if __name__ == '__main__':
# 编码
input_path = "./input/wav6.wav"
audio_data, nchannels, sampwidth, framerate, nframes = read_audio(input_path)
encoded_data = test_g711(audio_data, type=1)
# print(encoded_data)
save_audio(output_path="./output/encode_demo.wav", data=encoded_data, nchannels=nchannels, sampwidth=1,
framerate=framerate, nframes=nframes)
# 解码
encode_path = "./output/encode_demo.wav"
audio_data, nchannels, sampwidth, framerate, nframes = read_audio(encode_path)
decoded_data = test_g711(audio_data, type=2)
save_audio(output_path="./output/decode_demo.wav", data=decoded_data, nchannels=nchannels, sampwidth=2,
framerate=framerate, nframes=nframes)