机器学习——上机4——决策树

上机实验:决策树

任务1:分支节点的选择方法

现有一个数据集 weekend.txt,目标是根据一个人的特征来预测其周末是否出行。

所有特征均为二元特征,取值为 0 或 1,其中“status”(目标特征也是类别)表示用户的周末是否出行,1 表示出行,0 表示不出行,“marriageStatus”表示申请人是否已婚、“hasChild”表示申请人是否有小孩、“hasAppointment”表示申请人是否有约、“weather”表示天气是否晴朗。

已知信息熵和信息增益的公式为:

$$\text{Entropy}(D)=-\sum_{k=1}^{C}p_k \cdot log_2(p_k)$$

$$\text{InfoGain}(D, a)=\text{Entropy}(D)-\sum_{v=1}^{V}\frac{|D^v|}{|D|} \cdot\text{Entropy}(D^v)$$

请完成以下三个内容:

  • 请自定义函数 cal_entropy(data, feature_name)计算数据集data关于feature_name的信息熵。输入参数 data 为 DataFrame,feature_name 为目标特征(或类别)的名称;

  • 请调用 cal_entropy() 函数计算决策树分支之前的信息熵,保存为 data_entropy;

  • 请自定义函数 cal_infoGain(data, base_entropy) 计算 weekend.txt 中各个特征的信息增益,保存为列表 infogains,并选择信息增益最大的分支节点 best_feature。

补全代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 导入必要库
import numpy as np
import pandas as pd

# 读取数据(假设文件为tab分隔,包含特征和目标变量'status')
weekend_data = pd.read_table('weekend.txt', sep=' ')

## 自定义熵计算函数
def cal_entropy(data, feature_name):
'''
计算给定特征的信息熵
:param data: 输入的DataFrame数据集
:param feature_name: 需要计算熵的目标特征名称
:return: 保留三位小数的熵值
'''
entropy = 0 # 初始化熵值
num = data.shape[0] # 获取数据集总样本数

# 统计目标特征各取值的频数(如:status特征中"出门"和"不出门"的数量)
freq_stats = data[feature_name].value_counts()

# 遍历每个不同取值计算熵
for freq in freq_stats:
prob = freq / num # 计算当前取值的概率
# 根据信息熵公式累加计算(注意:熵公式为负数求和)
entropy -= prob * np.log2(prob) # 等价于 entropy += -prob * log2(prob)

return round(entropy, 3) # 保留三位小数

## 计算初始信息熵(假设目标特征列为'status')
data_entropy = cal_entropy(weekend_data, 'status')

## 自定义信息增益计算函数
def cal_infoGain(data, base_entropy):
'''
计算所有特征的信息增益并选择最优特征
:param data: 输入的DataFrame数据集
:param base_entropy: 数据集的初始熵值
:return: 信息增益列表和最优特征名称
'''
infogain_list = [] # 存储各特征的信息增益
total_samples = data.shape[0] # 总样本数
feature_list = list(data.columns.values) # 获取所有特征名称
feature_list.remove('status') # 移除目标特征(避免计算自身)

# 遍历每个特征计算信息增益
for feature in feature_list:
sub_entropy = 0 # 初始化条件熵

# 获取当前特征的取值分布(如:天气特征的"晴朗/下雨/阴天")
value_counts = data[feature].value_counts()

# 遍历特征的每个取值
for value, count in value_counts.items():
# 获取特征取当前值的子集
subset = data[data[feature] == value]
subset_samples = subset.shape[0] # 子集样本数
weight = subset_samples / total_samples # 计算权重

# 计算子集的熵并累加加权熵
sub_entropy += weight * cal_entropy(subset, 'status')

# 计算信息增益(信息增益 = 基础熵 - 条件熵)
info_gain = base_entropy - sub_entropy
infogain_list.append(round(info_gain, 4)) # 保留四位小数

# 找到信息增益最大的特征
max_gain = max(infogain_list) # 最大增益值
max_index = infogain_list.index(max_gain) # 最大值索引
best_feature = feature_list[max_index] # 对应的最优特征名称

return infogain_list, best_feature

## 执行信息增益计算
infogains, best_feature = cal_infoGain(weekend_data, data_entropy)

## 结果输出
print('各特征的信息增益:', infogains)
print('\n信息增益最大的特征:', best_feature)
各特征的信息增益: [0.0076, 0.0076, 0.0322, 0.0868]

信息增益最大的特征: weather

期望输出:

任务2:常见的决策树算法

现在有一份有关商品销量的数据集product.csv,数据集的离散型特征信息如下:

特征名称 取值说明
天气 1:天气好;0:天气坏
是否周末 1:是;0:不是
是否有促销 1:有促销;0:没有促销
销量 1:销量高;0:销量低

请完成以下三个内容: - 请根据提供的商品销量数据集 data,使用 sklearn 中的 DecisionTreeClassifier()函数构建决策树模型,模型选择分支结点的特征以Gini指数为判定准则; - 训练模型,并对测试集test_X进行预测,将预测结果存为 pred_y,进行模型评估; - 将构建的决策树模型进行可视化。

补全代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pandas as pd
from sklearn.tree import DecisionTreeClassifier, export_graphviz # 补全export_graphviz导入
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

data = pd.read_csv('product.csv')

## 对数据集切片,获取除目标特征以外的其他特征的数据记录X
X = data[["天气", "是否周末", "是否有促销"]] # 使用双括号选择多列

## 对数据集切片,获取目标特征`销量`的数据记录y
y = data["销量"]

## 使用train_test_split函数划分训练集train_X, train_y和测试集test_X, test_y
## 测试集所占比例为0.1,random_state为0
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.1, random_state=0)

## 构建分支节点选择方法为基尼指数的决策树模型tree_model,进行模型训练、测试与性能评估
tree_model = DecisionTreeClassifier(criterion='gini') # 设置基尼指数准则
tree_model.fit(train_X, train_y) # 模型训练

pred_y = tree_model.predict(test_X) # 测试集预测
print("模型分类报告:")
print(classification_report(test_y, pred_y)) # 输出评估报告

## 决策树可视化(修正特征名称与数据列一致)
import graphviz
dot_data = export_graphviz(
tree_model,
out_file=None,
feature_names=["天气", "是否周末", "是否有促销"], # 修正为完整特征名称
class_names=["销量低", "销量高"],
filled=True,
rounded=True
)
graph = graphviz.Source(dot_data)
graph
模型分类报告:

              precision    recall  f1-score   support

           0       1.00      0.50      0.67         2

           1       0.67      1.00      0.80         2

    accuracy                           0.75         4

   macro avg       0.83      0.75      0.73         4

weighted avg       0.83      0.75      0.73         4


svg

期望输出:

任务3:利用任务1的cal_infoGain函数自行实现ID3决策树算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
import numpy as np

import pandas as pd

from sklearn.model_selection import train_test_split

from sklearn.metrics import classification_report

import graphviz



## 任务1的熵计算函数(已完整实现)

def cal_entropy(data, feature_name):

'''

计算给定特征的信息熵

:param data: 输入的DataFrame数据集

:param feature_name: 需要计算熵的目标特征名称

:return: 保留三位小数的熵值

'''

entropy = 0 # 初始化熵值

num = data.shape[0] # 获取数据集总样本数



# 统计目标特征各取值的频数分布

freq_stats = data[feature_name].value_counts()



# 遍历每个不同取值计算熵

for freq in freq_stats:

prob = freq / num # 计算当前取值的概率

# 根据信息熵公式累加计算(Σ -p_i log2(p_i))

entropy -= prob * np.log2(prob) # 等价于 entropy += -prob * log2(prob)



return round(entropy, 3) # 保留三位小数



## 任务1的信息增益计算函数(已完整实现)

def cal_infoGain(data, base_entropy):

'''

计算所有特征的信息增益并选择最优特征

:param data: 输入的DataFrame数据集

:param base_entropy: 数据集的初始熵值

:return: 信息增益列表和最优特征名称

'''

infogain_list = [] # 存储各特征的信息增益

total_samples = data.shape[0] # 总样本数

feature_list = list(data.columns.values) # 所有特征名称



# 自动识别目标特征(假设目标特征不在特征列表中)

target_feature = [col for col in feature_list if col not in data.columns][0]

feature_list = [col for col in data.columns if col != target_feature] # 移除目标特征



# 遍历每个特征计算信息增益

for feature in feature_list:

sub_entropy = 0 # 初始化条件熵

# 获取当前特征的取值分布

value_counts = data[feature].value_counts()



# 计算条件熵

for value, count in value_counts.items():

subset = data[data[feature] == value] # 特征取当前值的子集

subset_samples = subset.shape[0] # 子集样本数

weight = subset_samples / total_samples # 计算权重

# 累加加权熵

sub_entropy += weight * cal_entropy(subset, target_feature)



# 计算信息增益

info_gain = base_entropy - sub_entropy

infogain_list.append(round(info_gain, 4)) # 保留四位小数



# 选择最优特征

max_gain = max(infogain_list) # 最大信息增益值

max_index = infogain_list.index(max_gain) # 最大值索引

best_feature = feature_list[max_index] # 最优特征名称



return infogain_list, best_feature



## ID3决策树实现

class ID3DecisionTree:

def __init__(self):

self.tree = None # 存储决策树结构

self.target = None # 目标特征名称



def fit(self, data, target_feature):

'''

训练决策树模型

:param data: 包含特征和目标列的DataFrame

:param target_feature: 目标特征名称(如'销量')

'''

self.target = target_feature

# 提取特征列表(排除目标特征)

features = [col for col in data.columns if col != target_feature]

# 递归构建决策树

self.tree = self._build_tree(data, features)



def _build_tree(self, data, features):

'''

递归构建决策树

:param data: 当前节点的数据集

:param features: 当前可用的特征列表

:return: 字典形式的树节点

'''

# 终止条件1:所有样本属于同一类别

if len(data[self.target].unique()) == 1:

return {

'class': data[self.target].values[0], # 叶节点类别

'samples': len(data) # 样本数量

}



# 终止条件2:无剩余特征可用时选择多数类

if not features:

class_counts = data[self.target].value_counts()

return {

'class': class_counts.idxmax(), # 多数类

'samples': len(data)

}



# 计算当前数据集的熵

base_entropy = cal_entropy(data, self.target)

# 获取最优特征和信息增益列表

info_gains, best_feature = cal_infoGain(data, base_entropy)



# 创建当前树节点

node = {

'feature': best_feature, # 分裂特征

'info_gain': info_gains[features.index(best_feature)], # 信息增益值

'samples': len(data), # 样本数量

'children': {} # 子节点

}



# 递归构建子树(排除当前最优特征)

remaining_features = [f for f in features if f != best_feature]

# 遍历最优特征的所有取值

for value in data[best_feature].unique():

subset = data[data[best_feature] == value] # 获取子集



# 处理空子集(采用父节点多数类)

if subset.empty:

class_counts = data[self.target].value_counts()

node['children'][value] = {

'class': class_counts.idxmax(),

'samples': 0

}

else:

# 递归构建子树

node['children'][value] = self._build_tree(subset, remaining_features)



return node



def predict(self, X):

'''

对新样本进行预测

:param X: 特征数据(DataFrame格式)

:return: 预测结果列表

'''

predictions = []

# 遍历每个样本

for _, sample in X.iterrows():

current_node = self.tree

# 遍历树直到叶节点

while 'children' in current_node:

feature = current_node['feature'] # 当前分裂特征

value = sample[feature] # 样本在该特征的取值



# 处理未见过的特征值(采用当前节点多数类)

if value not in current_node['children']:

class_counts = self._get_class_counts(current_node)

# 选择样本数最多的类别

predictions.append(max(class_counts, key=class_counts.get))

break # 跳出循环



# 移动到子节点

current_node = current_node['children'][value]



# 记录叶节点类别

if 'class' in current_node:

predictions.append(current_node['class'])



return predictions



def _get_class_counts(self, node):

'''

递归统计节点中的类别分布

:param node: 当前节点

:return: 类别计数字典

'''

counts = {}

# 如果是叶节点直接返回

if 'class' in node:

return {node['class']: node['samples']}



# 递归统计子节点

for child in node['children'].values():

child_counts = self._get_class_counts(child)

for cls, cnt in child_counts.items():

counts[cls] = counts.get(cls, 0) + cnt



return counts



def visualize(self, feature_names, class_names):

'''

可视化决策树

:param feature_names: 特征名称列表

:param class_names: 类别名称列表

:return: graphviz对象

'''

dot = graphviz.Digraph() # 创建有向图

# 递归构建图形

self._build_graph(dot, self.tree, feature_names, class_names)

return dot



def _build_graph(self, dot, node, feature_names, class_names, parent=None, edge_label=""):

'''

递归构建graphviz图形

:param dot: graphviz.Digraph对象

:param node: 当前节点

:param feature_names: 特征名称列表

:param class_names: 类别名称列表

:param parent: 父节点(用于连接边)

:param edge_label: 边标签(特征取值)

'''

# 叶节点样式

if 'class' in node:

label = f"{class_names[int(node['class'])]}\\n{node['samples']} samples"

dot.node(

str(id(node)), # 唯一节点ID

label,

shape="box", # 矩形框

style="filled", # 填充颜色

fillcolor="lightblue" # 浅蓝色

)

# 内部节点样式

else:

label = f"{node['feature']}\\nIG={node['info_gain']:.3f}\\n{node['samples']} samples"

dot.node(

str(id(node)),

label,

shape="ellipse", # 椭圆

style="filled",

fillcolor="lightgreen" # 浅绿色

)



# 创建父节点到当前节点的边

if parent:

dot.edge(

str(id(parent)),

str(id(node)),

label=edge_label # 显示特征取值

)



# 递归处理子节点

if 'children' in node:

for value, child in node['children'].items():

self._build_graph(

dot,

child,

feature_names,

class_names,

node, # 当前节点作为父节点

str(value) # 边标签为特征取值

)
1