import time import openpyxl import pandas as pd import sqlite3 import os from PySide6.QtWidgets import QMessageBox from openpyxl.styles import Font, NamedStyle, Alignment from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl import Workbook def export_result_data(ui, utf_en, db_path, export_folder): try: # 解码 utf_en decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en # 连接数据库 conn = sqlite3.connect(db_path) # 查询数据库表为DataFrame,添加 TableName 条件 query = "SELECT * FROM GC_Output_Point WHERE TableName=?" df = pd.read_sql_query(query, conn, params=(utf_en,)) # 检查是否有匹配的数据 if df.empty: QMessageBox.warning(ui, '警告', '没有找到匹配的数据进行导出') conn.close() return # 假设 TableName 字段是以字节序列存储的 UTF-8 编码字符串 if 'TableName' in df.columns: try: df['TableName'] = df['TableName'].apply(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x) except Exception as e: QMessageBox.critical(ui, '错误', f'TableName 字段解码失败: {str(e)}') conn.close() return # 删除 TableName 列 if 'TableName' in df.columns: df.drop(columns=['TableName'], inplace=True) # new HDiff 保留小数点后6位 if 'New_HDiff' in df.columns: df['New_HDiff'] = df['New_HDiff'].round(6) # New_RLen 保留小数点后6位 if 'New_RLen' in df.columns: df['New_RLen'] = df['New_RLen'].round(6) # Correct_Factor 保留小数点后2位 if 'Correct_Factor' in df.columns: df['Correct_Factor'] = df['Correct_Factor'].round(2) # Period_Diff 保留小数点后2位 if 'Period_Diff' in df.columns: df['Period_Diff'] = df['Period_Diff'].round(2) # 重命名指定的列 column_mapping = { 'New_ID': '序号', 'New_ResultName': '结果期数', 'New_SPName': '起点', 'New_EPName': '终点', 'New_HDiff': '高差', 'New_RLen': '路线长', 'Correct_Factor': '修正数', 'Period_Diff': '期间差异', 'Dis_Ass': '变形判定' } df.rename(columns=column_mapping, inplace=True) # 查询 GC_Input_Param 表获取 Correct_Factor 字段 query_param = "SELECT Correct_Factor FROM GC_Input_Param WHERE TableName=?" correct_factor_df = pd.read_sql_query(query_param, conn, params=(utf_en,)) correct_factor = correct_factor_df.iloc[0]['Correct_Factor'] # 创建一个新的 Excel 工作簿 wb = Workbook() ws = wb.active # 在E1单元格添加内容“修正系数” ws.cell(row=1, column=1, value="序号") ws.cell(row=1, column=2, value="结果期数") ws.cell(row=1, column=3, value="起点") ws.cell(row=1, column=4, value="终点") ws.cell(row=1, column=5, value="修正系数") ws.cell(row=1, column=6, value=correct_factor) ws.cell(row=1, column=7, value="修正数") ws.cell(row=1, column=8, value="期间差异") ws.cell(row=1, column=9, value="变形判定") # 将 DataFrame 写入工作表 for r in dataframe_to_rows(df, index=False, header=True): ws.append(r) # 定义自定义样式 style_0_000000 = NamedStyle(name="style_0_000000", number_format='0.000000') style_0_00 = NamedStyle(name="style_0_00", number_format='0.00') # 添加样式到工作簿 wb.add_named_style(style_0_000000) wb.add_named_style(style_0_00) # 应用样式到相应列 hdiff_col_index = df.columns.get_loc('高差') + 1 # 获取“高差”列的索引(+1 因为 Excel 列索引从 1 开始) rlen_col_index = df.columns.get_loc('路线长') + 1 # 获取“路线长”列的索引 correct_factor_col_index = df.columns.get_loc('修正数') + 1 # 获取“修正数”列的索引 period_diff_col_index = df.columns.get_loc('期间差异') + 1 # 获取“期间差异”列的索引 for row in range(2, ws.max_row + 1): # 从第二行开始,因为第一行为标题行 ws.cell(row=row, column=hdiff_col_index).style = style_0_000000 ws.cell(row=row, column=rlen_col_index).style = style_0_000000 ws.cell(row=row, column=correct_factor_col_index).style = style_0_00 ws.cell(row=row, column=period_diff_col_index).style = style_0_00 # 设置 Dis_Ass 列为“变形”的行的字体颜色为红色 red_font = Font(color="FF0000") dis_ass_col_index = df.columns.get_loc('变形判定') + 1 # 获取“变形判定”列的索引 for row in range(2, ws.max_row + 1): # 从第二行开始,因为第一行为标题行 cell_value = ws.cell(row=row, column=dis_ass_col_index).value if cell_value == '变形': for col in range(1, ws.max_column + 1): ws.cell(row=row, column=col).font = red_font # 设置列宽 ws.column_dimensions['E'].width = 12 ws.column_dimensions['F'].width = 12 # 保存 Excel 文件之前添加单元格合并操作 merge_cells = ['A1:A2', 'B1:B2', 'C1:C2', 'D1:D2', 'G1:G2', 'H1:H2', 'I1:I2'] for cell_range in merge_cells: ws.merge_cells(cell_range) # 设置合并单元格后的居中对齐 for cell_range in merge_cells: cell = ws[cell_range.split(':')[0]] cell.alignment = Alignment(horizontal='center', vertical='center') # 保存 Excel 文件 excel_filename = f"{os.path.splitext(decoded_utf_en)[0]}-成果数据.xlsx" excel_filepath = os.path.join(export_folder, excel_filename) wb.save(excel_filepath) QMessageBox.information(ui, '成功', f'成果文件已成功导出到 {export_folder}') except Exception as e: QMessageBox.critical(ui, '错误', f'导出过程中发生错误: {str(e)}') finally: # 关闭数据库连接 conn.close() def export_initial_data(ui, utf_en, db_path, export_folder): try: # 解码 utf_en decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en # 创建一个新的 Excel 工作簿 wb = Workbook() ws = wb.active # 合并指定的单元格 merge_cells = ['A1:B1', 'D1:E1', 'F1:G1', 'I1:J1'] for cell_range in merge_cells: ws.merge_cells(cell_range) # 设置合并单元格后的居中对齐 for cell_range in merge_cells: cell = ws[cell_range.split(':')[0]] cell.alignment = Alignment(horizontal='center', vertical='center') # 连接数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() # 查询 GC_Input_Param 表获取 ObservationLevel, Last_ResultName, New_ResultName, Ms_Station 字段 query_observation_level = "SELECT ObservationLevel, Last_ResultName, New_ResultName, Ms_Station FROM GC_Input_Param WHERE TableName=?" cursor.execute(query_observation_level, (utf_en,)) result = cursor.fetchone() if result: observation_level, last_result_name, new_result_name, ms_station = result else: observation_level = "未找到" last_result_name = "未找到" new_result_name = "未找到" ms_station = "未找到" # 填写表头信息 ws['A1'] = '观测等级' ws['C1'] = observation_level # 将 ObservationLevel 写入 C1 单元格 ws['A2'] = '序号' ws['B2'] = '起点' ws['C2'] = '终点' ws['D2'] = '高差(m)' ws['E2'] = '路线长(km)' ws['F1'] = '测站全中误差(mm)' ws['F2'] = '序号' ws['G2'] = '起点' ws['H2'] = '终点' ws['I2'] = '高差(m)' ws['J2'] = '路线长(km)' # 写入 Last_ResultName, New_ResultName, Ms_Station 到相应的单元格 ws['D1'] = last_result_name ws['I1'] = new_result_name ws['H1'] = ms_station # 设置 H1 单元格的数值格式为保留两位小数 ws['H1'].number_format = '0.00' # 连接数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() # 查询数据库表为DataFrame,添加 TableName 条件 query = "SELECT * FROM GC_Input_Point WHERE TableName=?" cursor.execute(query, (utf_en,)) rows = cursor.fetchall() # 获取列名 column_names = [description[0] for description in cursor.description] # 定义需要的列索引 last_id_index = column_names.index('Last_ID') last_spname_index = column_names.index('Last_SPName') last_epname_index = column_names.index('Last_EPName') last_hdiff_index = column_names.index('Last_HDiff') last_rlen_index = column_names.index('Last_RLen') new_id_index = column_names.index('New_ID') new_spname_index = column_names.index('New_SPName') new_epname_index = column_names.index('New_EPName') new_hdiff_index = column_names.index('New_HDiff') new_rlen_index = column_names.index('New_RLen') # 填充数据到Excel for idx, row in enumerate(rows, start=3): ws.cell(row=idx, column=1, value=row[last_id_index]) ws.cell(row=idx, column=2, value=row[last_spname_index]) ws.cell(row=idx, column=3, value=row[last_epname_index]) ws.cell(row=idx, column=4, value=row[last_hdiff_index]) ws.cell(row=idx, column=5, value=row[last_rlen_index]) ws.cell(row=idx, column=6, value=row[new_id_index]) ws.cell(row=idx, column=7, value=row[new_spname_index]) ws.cell(row=idx, column=8, value=row[new_epname_index]) ws.cell(row=idx, column=9, value=row[new_hdiff_index]) ws.cell(row=idx, column=10, value=row[new_rlen_index]) # 设置 D, E, I, J 列为强制保留六位小数 ws.cell(row=idx, column=4).number_format = '0.000000' ws.cell(row=idx, column=5).number_format = '0.000000' ws.cell(row=idx, column=9).number_format = '0.000000' ws.cell(row=idx, column=10).number_format = '0.000000' # 设置B,C,G,H的列宽为9.25 ws.column_dimensions['B'].width = 9.25 ws.column_dimensions['C'].width = 9.25 ws.column_dimensions['G'].width = 9.25 ws.column_dimensions['H'].width = 9.25 # 设置 D, E, I, J 列的列宽为11.5 ws.column_dimensions['D'].width = 11.5 ws.column_dimensions['E'].width = 11.5 ws.column_dimensions['I'].width = 11.5 ws.column_dimensions['J'].width = 11.5 # 设置文件名 excel_filename = f"{os.path.splitext(decoded_utf_en)[0]}-初始数据.xlsx" excel_filepath = os.path.join(export_folder, excel_filename) # 保存 Excel 文件 wb.save(excel_filepath) QMessageBox.information(ui, '成功', f'初始数据文件已成功导出到 {export_folder}') except Exception as e: QMessageBox.critical(ui, '错误', f'导出初始数据过程中发生错误: {str(e)}') def main_function_initial(ui, utf_en, db_path): # 获取应用的安装目录 app_install_dir = os.path.dirname(os.path.abspath(__file__)) # 假设脚本文件位于应用的安装目录下 export_folder = os.path.join(app_install_dir, 'Export') # 如果 Export 文件夹不存在,则创建它 if not os.path.exists(export_folder): os.makedirs(export_folder) # 调用导出初始数据函数 export_initial_data(ui, utf_en, db_path, export_folder) def main_function_result(ui, utf_en, db_path): # 获取应用的安装目录 app_install_dir = os.path.dirname(os.path.abspath(__file__)) # 假设脚本文件位于应用的安装目录下 export_folder = os.path.join(app_install_dir, 'Export') # 如果 Export 文件夹不存在,则创建它 if not os.path.exists(export_folder): os.makedirs(export_folder) # 调用导出结果函数 export_result_data(ui, utf_en, db_path, export_folder)