123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- import time
-
- import openpyxl
- import pandas as pd
- import sqlite3
- import os
- from PySide6.QtWidgets import QMessageBox
- from openpyxl.styles import Font, NamedStyle, Alignment
- from openpyxl.utils.dataframe import dataframe_to_rows
- from openpyxl import Workbook
-
-
- def export_result_data(ui, utf_en, db_path, export_folder):
- try:
- # 解码 utf_en
- decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en
- # 连接数据库
- conn = sqlite3.connect(db_path)
-
- # 查询数据库表为DataFrame,添加 TableName 条件
- query = "SELECT * FROM GC_Output_Point WHERE TableName=?"
- df = pd.read_sql_query(query, conn, params=(utf_en,))
-
- # 检查是否有匹配的数据
- if df.empty:
- QMessageBox.warning(ui, '警告', '没有找到匹配的数据进行导出')
- conn.close()
- return
-
- # 假设 TableName 字段是以字节序列存储的 UTF-8 编码字符串
- if 'TableName' in df.columns:
- try:
- df['TableName'] = df['TableName'].apply(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x)
- except Exception as e:
- QMessageBox.critical(ui, '错误', f'TableName 字段解码失败: {str(e)}')
- conn.close()
- return
-
- # 删除 TableName 列
- if 'TableName' in df.columns:
- df.drop(columns=['TableName'], inplace=True)
- # new HDiff 保留小数点后6位
- if 'New_HDiff' in df.columns:
- df['New_HDiff'] = df['New_HDiff'].round(6)
- # New_RLen 保留小数点后6位
- if 'New_RLen' in df.columns:
- df['New_RLen'] = df['New_RLen'].round(6)
- # Correct_Factor 保留小数点后2位
- if 'Correct_Factor' in df.columns:
- df['Correct_Factor'] = df['Correct_Factor'].round(2)
- # Period_Diff 保留小数点后2位
- if 'Period_Diff' in df.columns:
- df['Period_Diff'] = df['Period_Diff'].round(2)
-
- # 重命名指定的列
- column_mapping = {
- 'New_ID': '序号',
- 'New_ResultName': '结果期数',
- 'New_SPName': '起点',
- 'New_EPName': '终点',
- 'New_HDiff': '高差',
- 'New_RLen': '路线长',
- 'Correct_Factor': '修正数',
- 'Period_Diff': '期间差异',
- 'Dis_Ass': '变形判定'
- }
- df.rename(columns=column_mapping, inplace=True)
- # 查询 GC_Input_Param 表获取 Correct_Factor 字段
- query_param = "SELECT Correct_Factor FROM GC_Input_Param WHERE TableName=?"
- correct_factor_df = pd.read_sql_query(query_param, conn, params=(utf_en,))
-
- correct_factor = correct_factor_df.iloc[0]['Correct_Factor']
-
- # 创建一个新的 Excel 工作簿
- wb = Workbook()
- ws = wb.active
- # 在E1单元格添加内容“修正系数”
- ws.cell(row=1, column=1, value="序号")
- ws.cell(row=1, column=2, value="结果期数")
- ws.cell(row=1, column=3, value="起点")
- ws.cell(row=1, column=4, value="终点")
- ws.cell(row=1, column=5, value="修正系数")
- ws.cell(row=1, column=6, value=correct_factor)
- ws.cell(row=1, column=7, value="修正数")
- ws.cell(row=1, column=8, value="期间差异")
- ws.cell(row=1, column=9, value="变形判定")
-
- # 将 DataFrame 写入工作表
- for r in dataframe_to_rows(df, index=False, header=True):
- ws.append(r)
-
- # 定义自定义样式
- style_0_000000 = NamedStyle(name="style_0_000000", number_format='0.000000')
- style_0_00 = NamedStyle(name="style_0_00", number_format='0.00')
-
- # 添加样式到工作簿
- wb.add_named_style(style_0_000000)
- wb.add_named_style(style_0_00)
-
- # 应用样式到相应列
- hdiff_col_index = df.columns.get_loc('高差') + 1 # 获取“高差”列的索引(+1 因为 Excel 列索引从 1 开始)
- rlen_col_index = df.columns.get_loc('路线长') + 1 # 获取“路线长”列的索引
- correct_factor_col_index = df.columns.get_loc('修正数') + 1 # 获取“修正数”列的索引
- period_diff_col_index = df.columns.get_loc('期间差异') + 1 # 获取“期间差异”列的索引
-
- for row in range(2, ws.max_row + 1): # 从第二行开始,因为第一行为标题行
- ws.cell(row=row, column=hdiff_col_index).style = style_0_000000
- ws.cell(row=row, column=rlen_col_index).style = style_0_000000
- ws.cell(row=row, column=correct_factor_col_index).style = style_0_00
- ws.cell(row=row, column=period_diff_col_index).style = style_0_00
-
- # 设置 Dis_Ass 列为“变形”的行的字体颜色为红色
- red_font = Font(color="FF0000")
- dis_ass_col_index = df.columns.get_loc('变形判定') + 1 # 获取“变形判定”列的索引
-
- for row in range(2, ws.max_row + 1): # 从第二行开始,因为第一行为标题行
- cell_value = ws.cell(row=row, column=dis_ass_col_index).value
- if cell_value == '变形':
- for col in range(1, ws.max_column + 1):
- ws.cell(row=row, column=col).font = red_font
- # 设置列宽
- ws.column_dimensions['E'].width = 12
- ws.column_dimensions['F'].width = 12
-
- # 保存 Excel 文件之前添加单元格合并操作
- merge_cells = ['A1:A2', 'B1:B2', 'C1:C2', 'D1:D2', 'G1:G2', 'H1:H2', 'I1:I2']
- for cell_range in merge_cells:
- ws.merge_cells(cell_range)
-
- # 设置合并单元格后的居中对齐
- for cell_range in merge_cells:
- cell = ws[cell_range.split(':')[0]]
- cell.alignment = Alignment(horizontal='center', vertical='center')
- # 保存 Excel 文件
- excel_filename = f"{os.path.splitext(decoded_utf_en)[0]}-成果数据.xlsx"
- excel_filepath = os.path.join(export_folder, excel_filename)
- wb.save(excel_filepath)
-
- QMessageBox.information(ui, '成功', f'成果文件已成功导出到 {export_folder}')
- except Exception as e:
- QMessageBox.critical(ui, '错误', f'导出过程中发生错误: {str(e)}')
- finally:
- # 关闭数据库连接
- conn.close()
-
-
- def export_initial_data(ui, utf_en, db_path, export_folder):
- try:
- # 解码 utf_en
- decoded_utf_en = utf_en.decode('utf-8') if isinstance(utf_en, bytes) else utf_en
-
- # 创建一个新的 Excel 工作簿
- wb = Workbook()
- ws = wb.active
-
- # 合并指定的单元格
- merge_cells = ['A1:B1', 'D1:E1', 'F1:G1', 'I1:J1']
- for cell_range in merge_cells:
- ws.merge_cells(cell_range)
-
- # 设置合并单元格后的居中对齐
- for cell_range in merge_cells:
- cell = ws[cell_range.split(':')[0]]
- cell.alignment = Alignment(horizontal='center', vertical='center')
- # 连接数据库
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
-
- # 查询 GC_Input_Param 表获取 ObservationLevel, Last_ResultName, New_ResultName, Ms_Station 字段
- query_observation_level = "SELECT ObservationLevel, Last_ResultName, New_ResultName, Ms_Station FROM GC_Input_Param WHERE TableName=?"
- cursor.execute(query_observation_level, (utf_en,))
- result = cursor.fetchone()
- if result:
- observation_level, last_result_name, new_result_name, ms_station = result
- else:
- observation_level = "未找到"
- last_result_name = "未找到"
- new_result_name = "未找到"
- ms_station = "未找到"
-
- # 填写表头信息
- ws['A1'] = '观测等级'
- ws['C1'] = observation_level # 将 ObservationLevel 写入 C1 单元格
- ws['A2'] = '序号'
- ws['B2'] = '起点'
- ws['C2'] = '终点'
- ws['D2'] = '高差(m)'
- ws['E2'] = '路线长(km)'
- ws['F1'] = '测站全中误差(mm)'
- ws['F2'] = '序号'
- ws['G2'] = '起点'
- ws['H2'] = '终点'
- ws['I2'] = '高差(m)'
- ws['J2'] = '路线长(km)'
-
- # 写入 Last_ResultName, New_ResultName, Ms_Station 到相应的单元格
- ws['D1'] = last_result_name
- ws['I1'] = new_result_name
- ws['H1'] = ms_station
-
- # 设置 H1 单元格的数值格式为保留两位小数
- ws['H1'].number_format = '0.00'
-
- # 连接数据库
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
-
- # 查询数据库表为DataFrame,添加 TableName 条件
- query = "SELECT * FROM GC_Input_Point WHERE TableName=?"
- cursor.execute(query, (utf_en,))
- rows = cursor.fetchall()
-
- # 获取列名
- column_names = [description[0] for description in cursor.description]
-
- # 定义需要的列索引
- last_id_index = column_names.index('Last_ID')
- last_spname_index = column_names.index('Last_SPName')
- last_epname_index = column_names.index('Last_EPName')
- last_hdiff_index = column_names.index('Last_HDiff')
- last_rlen_index = column_names.index('Last_RLen')
- new_id_index = column_names.index('New_ID')
- new_spname_index = column_names.index('New_SPName')
- new_epname_index = column_names.index('New_EPName')
- new_hdiff_index = column_names.index('New_HDiff')
- new_rlen_index = column_names.index('New_RLen')
-
- # 填充数据到Excel
- for idx, row in enumerate(rows, start=3):
- ws.cell(row=idx, column=1, value=row[last_id_index])
- ws.cell(row=idx, column=2, value=row[last_spname_index])
- ws.cell(row=idx, column=3, value=row[last_epname_index])
- ws.cell(row=idx, column=4, value=row[last_hdiff_index])
- ws.cell(row=idx, column=5, value=row[last_rlen_index])
- ws.cell(row=idx, column=6, value=row[new_id_index])
- ws.cell(row=idx, column=7, value=row[new_spname_index])
- ws.cell(row=idx, column=8, value=row[new_epname_index])
- ws.cell(row=idx, column=9, value=row[new_hdiff_index])
- ws.cell(row=idx, column=10, value=row[new_rlen_index])
-
- # 设置 D, E, I, J 列为强制保留六位小数
- ws.cell(row=idx, column=4).number_format = '0.000000'
- ws.cell(row=idx, column=5).number_format = '0.000000'
- ws.cell(row=idx, column=9).number_format = '0.000000'
- ws.cell(row=idx, column=10).number_format = '0.000000'
- # 设置B,C,G,H的列宽为9.25
- ws.column_dimensions['B'].width = 9.25
- ws.column_dimensions['C'].width = 9.25
- ws.column_dimensions['G'].width = 9.25
- ws.column_dimensions['H'].width = 9.25
-
- # 设置 D, E, I, J 列的列宽为11.5
- ws.column_dimensions['D'].width = 11.5
- ws.column_dimensions['E'].width = 11.5
- ws.column_dimensions['I'].width = 11.5
- ws.column_dimensions['J'].width = 11.5
- # 设置文件名
- excel_filename = f"{os.path.splitext(decoded_utf_en)[0]}-初始数据.xlsx"
- excel_filepath = os.path.join(export_folder, excel_filename)
-
- # 保存 Excel 文件
- wb.save(excel_filepath)
-
- QMessageBox.information(ui, '成功', f'初始数据文件已成功导出到 {export_folder}')
- except Exception as e:
- QMessageBox.critical(ui, '错误', f'导出初始数据过程中发生错误: {str(e)}')
-
-
- def main_function_initial(ui, utf_en, db_path):
- # 获取应用的安装目录
- app_install_dir = os.path.dirname(os.path.abspath(__file__)) # 假设脚本文件位于应用的安装目录下
- export_folder = os.path.join(app_install_dir, 'Export')
-
- # 如果 Export 文件夹不存在,则创建它
- if not os.path.exists(export_folder):
- os.makedirs(export_folder)
-
- # 调用导出初始数据函数
- export_initial_data(ui, utf_en, db_path, export_folder)
-
-
- def main_function_result(ui, utf_en, db_path):
- # 获取应用的安装目录
- app_install_dir = os.path.dirname(os.path.abspath(__file__)) # 假设脚本文件位于应用的安装目录下
- export_folder = os.path.join(app_install_dir, 'Export')
-
- # 如果 Export 文件夹不存在,则创建它
- if not os.path.exists(export_folder):
- os.makedirs(export_folder)
- # 调用导出结果函数
- export_result_data(ui, utf_en, db_path, export_folder)
|