|
- import time
-
- import openpyxl
- import pandas as pd
- import sqlite3
- import os
- from PySide6.QtWidgets import QMessageBox
- from openpyxl.styles import Font, NamedStyle, Alignment
- from openpyxl.utils.dataframe import dataframe_to_rows
- from openpyxl import Workbook
-
-
- def export_combined_data(ui, utf_en, db_path, export_folder, file_name):
- try:
- # 解码 utf_en
- decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en
-
- # 创建一个新的 Excel 工作簿
- wb = Workbook()
-
- # 导出初始数据到第一个 sheet
- ws_initial = wb.active
- ws_initial.title = "初始高差数据"
-
- # 合并指定的单元格
- merge_cells = ['A1:B1', 'D1:E1', 'F1:G1', 'I1:J1']
- for cell_range in merge_cells:
- ws_initial.merge_cells(cell_range)
-
- # 设置合并单元格后的居中对齐
- for cell_range in merge_cells:
- cell = ws_initial[cell_range.split(':')[0]]
- cell.alignment = Alignment(horizontal='center', vertical='center')
-
- # 连接数据库
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
-
- # 查询 GC_Input_Param 表获取 ObservationLevel, Last_ResultName, New_ResultName, Ms_Station 字段
- query_observation_level = "SELECT ObservationLevel, Last_ResultName, New_ResultName, Ms_Station FROM GC_Input_Param WHERE TableName=?"
- cursor.execute(query_observation_level, (utf_en,))
- result = cursor.fetchone()
- if result:
- observation_level, last_result_name, new_result_name, ms_station = result
- else:
- observation_level = "未找到"
- last_result_name = "未找到"
- new_result_name = "未找到"
- ms_station = "未找到"
-
- # 填写表头信息
- ws_initial['A1'] = '观测等级'
- ws_initial['C1'] = observation_level # 将 ObservationLevel 写入 C1 单元格
- ws_initial['A2'] = '序号'
- ws_initial['B2'] = '起点'
- ws_initial['C2'] = '终点'
- ws_initial['D2'] = '高差(m)'
- ws_initial['E2'] = '路线长(km)'
- ws_initial['F1'] = '测站全中误差(mm)'
- ws_initial['F2'] = '序号'
- ws_initial['G2'] = '起点'
- ws_initial['H2'] = '终点'
- ws_initial['I2'] = '高差(m)'
- ws_initial['J2'] = '路线长(km)'
-
- # 写入 Last_ResultName, New_ResultName, Ms_Station 到相应的单元格
- ws_initial['D1'] = last_result_name
- ws_initial['I1'] = new_result_name
- ws_initial['H1'] = ms_station
-
- # 设置 H1 单元格的数值格式为保留两位小数
- ws_initial['H1'].number_format = '0.00'
-
- # 查询数据库表为DataFrame,添加 TableName 条件
- query = "SELECT * FROM GC_Input_Point WHERE TableName=?"
- cursor.execute(query, (utf_en,))
- rows = cursor.fetchall()
-
- # 获取列名
- column_names = [description[0] for description in cursor.description]
-
- # 定义需要的列索引
- last_id_index = column_names.index('Last_ID')
- last_spname_index = column_names.index('Last_SPName')
- last_epname_index = column_names.index('Last_EPName')
- last_hdiff_index = column_names.index('Last_HDiff')
- last_rlen_index = column_names.index('Last_RLen')
- new_id_index = column_names.index('New_ID')
- new_spname_index = column_names.index('New_SPName')
- new_epname_index = column_names.index('New_EPName')
- new_hdiff_index = column_names.index('New_HDiff')
- new_rlen_index = column_names.index('New_RLen')
-
- # 填充数据到Excel
- for idx, row in enumerate(rows, start=3):
- ws_initial.cell(row=idx, column=1, value=row[last_id_index])
- ws_initial.cell(row=idx, column=2, value=row[last_spname_index])
- ws_initial.cell(row=idx, column=3, value=row[last_epname_index])
- ws_initial.cell(row=idx, column=4, value=row[last_hdiff_index])
- ws_initial.cell(row=idx, column=5, value=row[last_rlen_index])
- ws_initial.cell(row=idx, column=6, value=row[new_id_index])
- ws_initial.cell(row=idx, column=7, value=row[new_spname_index])
- ws_initial.cell(row=idx, column=8, value=row[new_epname_index])
- ws_initial.cell(row=idx, column=9, value=row[new_hdiff_index])
- ws_initial.cell(row=idx, column=10, value=row[new_rlen_index])
-
- # 设置 D, E, I, J 列为强制保留六位小数
- ws_initial.cell(row=idx, column=4).number_format = '0.000000'
- ws_initial.cell(row=idx, column=5).number_format = '0.000000'
- ws_initial.cell(row=idx, column=9).number_format = '0.000000'
- ws_initial.cell(row=idx, column=10).number_format = '0.000000'
-
- # 设置B,C,G,H的列宽为9.25
- ws_initial.column_dimensions['B'].width = 9.25
- ws_initial.column_dimensions['C'].width = 9.25
- ws_initial.column_dimensions['G'].width = 9.25
- ws_initial.column_dimensions['H'].width = 9.25
-
- # 设置 D, E, I, J 列的列宽为11.5
- ws_initial.column_dimensions['D'].width = 11.5
- ws_initial.column_dimensions['E'].width = 11.5
- ws_initial.column_dimensions['I'].width = 11.5
- ws_initial.column_dimensions['J'].width = 11.5
-
- # 导出成果数据到第二个 sheet
- ws_result = wb.create_sheet(title="成果高差数据")
-
- # 查询数据库表为DataFrame,添加 TableName 条件
- query = "SELECT * FROM GC_Output_Point WHERE TableName=?"
- df = pd.read_sql_query(query, conn, params=(utf_en,))
-
- # 检查是否有匹配的数据
- if df.empty:
- QMessageBox.warning(ui, '警告', '没有找到匹配的数据进行导出')
- conn.close()
- return
-
- # 假设 TableName 字段是以字节序列存储的 UTF-8 编码字符串
- if 'TableName' in df.columns:
- try:
- df['TableName'] = df['TableName'].apply(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x)
- except Exception as e:
- QMessageBox.critical(ui, '错误', f'TableName 字段解码失败: {str(e)}')
- conn.close()
- return
-
- # 删除 TableName 列
- if 'TableName' in df.columns:
- df.drop(columns=['TableName'], inplace=True)
-
- # new HDiff 保留小数点后6位
- if 'New_HDiff' in df.columns:
- df['New_HDiff'] = df['New_HDiff'].round(6)
-
- # New_RLen 保留小数点后6位
- if 'New_RLen' in df.columns:
- df['New_RLen'] = df['New_RLen'].round(6)
-
- # Correct_Factor 保留小数点后2位
- if 'Correct_Factor' in df.columns:
- df['Correct_Factor'] = df['Correct_Factor'].round(2)
-
- # Period_Diff 保留小数点后2位
- if 'Period_Diff' in df.columns:
- df['Period_Diff'] = df['Period_Diff'].round(2)
-
- # 重命名指定的列
- column_mapping = {
- 'New_ID': '序号',
- 'New_ResultName': '结果期数',
- 'New_SPName': '起点',
- 'New_EPName': '终点',
- 'New_HDiff': '高差',
- 'New_RLen': '路线长',
- 'Correct_Factor': '修正数',
- 'Period_Diff': '期间差异',
- 'Dis_Ass': '变形判定'
- }
- df.rename(columns=column_mapping, inplace=True)
-
- # 查询 GC_Input_Param 表获取 Correct_Factor 字段
- query_param = "SELECT Correct_Factor FROM GC_Input_Param WHERE TableName=?"
- correct_factor_df = pd.read_sql_query(query_param, conn, params=(utf_en,))
-
- correct_factor = correct_factor_df.iloc[0]['Correct_Factor']
-
- # 在E1单元格添加内容“修正系数”
- ws_result.cell(row=1, column=1, value="序号")
- ws_result.cell(row=1, column=2, value="结果期数")
- ws_result.cell(row=1, column=3, value="起点")
- ws_result.cell(row=1, column=4, value="终点")
- ws_result.cell(row=1, column=5, value="修正系数")
- ws_result.cell(row=1, column=6, value=correct_factor)
- ws_result.cell(row=1, column=7, value="修正数")
- ws_result.cell(row=1, column=8, value="期间差异")
- ws_result.cell(row=1, column=9, value="变形判定")
-
- # 将 DataFrame 写入工作表
- for r in dataframe_to_rows(df, index=False, header=True):
- ws_result.append(r)
-
- # 定义自定义样式
- style_0_000000 = NamedStyle(name="style_0_000000", number_format='0.000000')
- style_0_00 = NamedStyle(name="style_0_00", number_format='0.00')
-
- # 添加样式到工作簿
- wb.add_named_style(style_0_000000)
- wb.add_named_style(style_0_00)
-
- # 应用样式到相应列
- hdiff_col_index = df.columns.get_loc('高差') + 1 # 获取“高差”列的索引(+1 因为 Excel 列索引从 1 开始)
- rlen_col_index = df.columns.get_loc('路线长') + 1 # 获取“路线长”列的索引
- correct_factor_col_index = df.columns.get_loc('修正数') + 1 # 获取“修正数”列的索引
- period_diff_col_index = df.columns.get_loc('期间差异') + 1 # 获取“期间差异”列的索引
-
- for row in range(2, ws_result.max_row + 1): # 从第二行开始,因为第一行为标题行
- ws_result.cell(row=row, column=hdiff_col_index).style = style_0_000000
- ws_result.cell(row=row, column=rlen_col_index).style = style_0_000000
- ws_result.cell(row=row, column=correct_factor_col_index).style = style_0_00
- ws_result.cell(row=row, column=period_diff_col_index).style = style_0_00
-
- # 设置 Dis_Ass 列为“变形”的行的字体颜色为红色
- red_font = Font(color="FF0000")
- dis_ass_col_index = df.columns.get_loc('变形判定') + 1 # 获取“变形判定”列的索引
-
- for row in range(2, ws_result.max_row + 1): # 从第二行开始,因为第一行为标题行
- cell_value = ws_result.cell(row=row, column=dis_ass_col_index).value
- if cell_value == '变形':
- for col in range(1, ws_result.max_column + 1):
- ws_result.cell(row=row, column=col).font = red_font
-
- # 设置列宽
- ws_result.column_dimensions['E'].width = 12
- ws_result.column_dimensions['F'].width = 12
-
- # 保存 Excel 文件之前添加单元格合并操作
- merge_cells = ['A1:A2', 'B1:B2', 'C1:C2', 'D1:D2', 'G1:G2', 'H1:H2', 'I1:I2']
- for cell_range in merge_cells:
- ws_result.merge_cells(cell_range)
-
- # 设置合并单元格后的居中对齐
- for cell_range in merge_cells:
- cell = ws_result[cell_range.split(':')[0]]
- cell.alignment = Alignment(horizontal='center', vertical='center')
-
- # 获取当前时间并格式化为字符串,例如:20231010_143000
- timestamp = time.strftime("%Y%m%d", time.localtime())
-
- # 设置文件名
- excel_filepath = os.path.join(export_folder, file_name)
-
- # 保存 Excel 文件
- wb.save(excel_filepath)
-
- QMessageBox.information(ui, '成功', f'数据文件已导出到 {export_folder}')
- except Exception as e:
- if isinstance(e, PermissionError) or (isinstance(e, OSError) and e.errno == 13):
- QMessageBox.critical(ui, '错误', '请确认文件没有被其他程序(如Excel、文本编辑器等)打开。')
- else:
- QMessageBox.critical(ui, '错误', f'导出过程中发生错误: {str(e)}')
- print(e)
- finally:
- # 关闭数据库连接
- conn.close()
-
- def export_example_data(ui, utf_en, db_path, export_folder, file_name):
- try:
- # 解码 utf_en
- decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en
-
- # 创建一个新的 Excel 工作簿
- wb = Workbook()
- ws = wb.active
-
- # 合并指定的单元格
- merge_cells = ['A1:B1', 'D1:E1', 'F1:G1', 'I1:J1']
- for cell_range in merge_cells:
- ws.merge_cells(cell_range)
-
- # 设置合并单元格后的居中对齐
- for cell_range in merge_cells:
- cell = ws[cell_range.split(':')[0]]
- cell.alignment = Alignment(horizontal='center', vertical='center')
- # 连接数据库
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
-
- # 查询 GC_Input_Param 表获取 ObservationLevel, Last_ResultName, New_ResultName, Ms_Station 字段
- query_observation_level = "SELECT ObservationLevel, Last_ResultName, New_ResultName, Ms_Station FROM GC_Input_Param WHERE TableName=?"
- cursor.execute(query_observation_level, (utf_en,))
- result = cursor.fetchone()
- if result:
- observation_level, last_result_name, new_result_name, ms_station = result
- else:
- observation_level = "未找到"
- last_result_name = "未找到"
- new_result_name = "未找到"
- ms_station = "未找到"
-
- # 填写表头信息
- ws['A1'] = '观测等级'
- ws['C1'] = observation_level # 将 ObservationLevel 写入 C1 单元格
- ws['A2'] = '序号'
- ws['B2'] = '起点'
- ws['C2'] = '终点'
- ws['D2'] = '高差(m)'
- ws['E2'] = '路线长(km)'
- ws['F1'] = '测站全中误差(mm)'
- ws['F2'] = '序号'
- ws['G2'] = '起点'
- ws['H2'] = '终点'
- ws['I2'] = '高差(m)'
- ws['J2'] = '路线长(km)'
-
- # 写入 Last_ResultName, New_ResultName, Ms_Station 到相应的单元格
- ws['D1'] = last_result_name
- ws['I1'] = new_result_name
- ws['H1'] = ms_station
-
- # 设置 H1 单元格的数值格式为保留两位小数
- ws['H1'].number_format = '0.00'
-
- # 连接数据库
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
-
- # 查询数据库表为DataFrame,添加 TableName 条件
- query = "SELECT * FROM GC_Input_Point WHERE TableName=?"
- cursor.execute(query, (utf_en,))
- rows = cursor.fetchall()
-
- # 获取列名
- column_names = [description[0] for description in cursor.description]
-
- # 定义需要的列索引
- last_id_index = column_names.index('Last_ID')
- last_spname_index = column_names.index('Last_SPName')
- last_epname_index = column_names.index('Last_EPName')
- last_hdiff_index = column_names.index('Last_HDiff')
- last_rlen_index = column_names.index('Last_RLen')
- new_id_index = column_names.index('New_ID')
- new_spname_index = column_names.index('New_SPName')
- new_epname_index = column_names.index('New_EPName')
- new_hdiff_index = column_names.index('New_HDiff')
- new_rlen_index = column_names.index('New_RLen')
-
- # 填充数据到Excel
- for idx, row in enumerate(rows, start=3):
- ws.cell(row=idx, column=1, value=row[last_id_index])
- ws.cell(row=idx, column=2, value=row[last_spname_index])
- ws.cell(row=idx, column=3, value=row[last_epname_index])
- ws.cell(row=idx, column=4, value=row[last_hdiff_index])
- ws.cell(row=idx, column=5, value=row[last_rlen_index])
- ws.cell(row=idx, column=6, value=row[new_id_index])
- ws.cell(row=idx, column=7, value=row[new_spname_index])
- ws.cell(row=idx, column=8, value=row[new_epname_index])
- ws.cell(row=idx, column=9, value=row[new_hdiff_index])
- ws.cell(row=idx, column=10, value=row[new_rlen_index])
-
- # 设置 D, E, I, J 列为强制保留六位小数
- ws.cell(row=idx, column=4).number_format = '0.000000'
- ws.cell(row=idx, column=5).number_format = '0.000000'
- ws.cell(row=idx, column=9).number_format = '0.000000'
- ws.cell(row=idx, column=10).number_format = '0.000000'
- # 设置B,C,G,H的列宽为9.25
- ws.column_dimensions['B'].width = 9.25
- ws.column_dimensions['C'].width = 9.25
- ws.column_dimensions['G'].width = 9.25
- ws.column_dimensions['H'].width = 9.25
-
- # 设置 D, E, I, J 列的列宽为11.5
- ws.column_dimensions['D'].width = 11.5
- ws.column_dimensions['E'].width = 11.5
- ws.column_dimensions['I'].width = 11.5
- ws.column_dimensions['J'].width = 11.5
-
- # 设置文件名
- # excel_filename = f"{os.path.splitext(decoded_utf_en)[0]}.xlsx"
- excel_filepath = os.path.join(export_folder, file_name)
-
- # 保存 Excel 文件
- wb.save(excel_filepath)
-
- QMessageBox.information(ui, '成功', f'示例文件已成功导出到 {export_folder}')
- except Exception as e:
- if isinstance(e, PermissionError) or (isinstance(e, OSError) and e.errno == 13):
- QMessageBox.critical(ui, '错误', '请确认文件没有被其他程序(如Excel、文本编辑器等)打开。')
- else:
- QMessageBox.critical(ui, '错误', f'导出初始数据过程中发生错误: {str(e)}')
- print(e)
-
-
- def process_utf_en(utf_en):
- # 确保 utf_en 是字节字符串
- if not isinstance(utf_en, bytes):
- utf_en = utf_en.encode('utf-8')
-
- # 将字节字符串解码为普通字符串
- file_name = utf_en.decode('utf-8')
-
- # 检查文件后缀名并进行相应处理
- if file_name.endswith('.xls'):
- # 去掉 .xls 后缀并添加 .xlsx 后缀
- file_name = file_name[:-4] + '.xlsx'
-
- # 将处理后的字符串重新编码为字节字符串
- utf_en = file_name.encode('utf-8')
-
- return utf_en
-
-
- def main_function_example(ui, utf_en, db_path, export_folder, file_name):
- # 调用导出初始数据函数
- export_example_data(ui, utf_en, db_path, export_folder, file_name)
-
-
- def main_function_combined(ui, utf_en, db_path, export_folder, file_name):
- export_combined_data(ui, utf_en, db_path, export_folder, file_name)
|