2021年9月30日
psycopg2のインストール方法
こちらのサイトを参考にインストール
2023年で新しい環境にしたら、import readlineは不要になった
from os import replace
from tracemalloc import stop
import numpy as np
import datetime
import psycopg2
import sys
#import readline
#洗浄記録を面検査記録から自動登録
# 日付の取得
d_today = datetime.date.today().strftime('%Y年%m月%d日')
dsn = "dbname=mecsample host=192.168.1.1 user=postgres"
#dsn = "dbname=mecsample host=127.0.0.1 user=postgres password=mec"
'pow, chackr, coment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) '
' COUNT(*) AS CNT, MAX(date) AS clean_date '\
SELECT_SQL1 = 'SELECT MAX(date), tbl1.bc, tbl1.pow'\
' FROM (SELECT lotno, bc, pow FROM 生産指図書 WHERE lotno = %s) AS tbl1 ' \
' LEFT OUTER JOIN record_block_bc ON SUBSTRING(record_block_bc.lotno, 1 ,7) = tbl1.lotno ' \
' GROUP BY tbl1.lotno, tbl1.bc, tbl1.pow '
def readSurefaceData(tempDate, tempDateEnd):
# DB接続 & Ver情報取得と出力
try:
conn = psycopg2.connect(dsn)
cur = conn.cursor()
tempDateEnd, '%Y%m%d') + datetime.timedelta(days=1))))
readList = cur.fetchall()
except ConnectionError as err:
print(err + '読み込みに失敗しました。')
else:
print('')
finally:
cur.close()
conn.close()
return readList
def readBlockData(tempReadList):
# DB接続 & Ver情報取得と出力
try:
conn = psycopg2.connect(dsn)
cur = conn.cursor()
readList_0 =
for row in tempReadList:
cur.execute(SELECT_SQL1, (row[0],))
readList_0.append(cur.fetchall())
except ConnectionError as err:
print(err + '読み込みに失敗しました。')
else:
print('')
finally:
cur.close()
conn.close()
return readList_0
def replaceDate(str):
"""
日付の年月日を‐に置き換える。
Parameters
----------
str : string
変更する日付文字列。
Returns
-------
str : string
置き換え後の日付文字列。
"""
str = str.replace('年', '-')
str = str.replace('月', '-')
str = str.replace('日', '')
return str
def insertDB(insertStr, loopCnt):
"""
DBに洗浄データ登録。
Parameters
----------
insertStr : string
SQLのパラメータ内容。
loopCnt : int
DBの登録繰り返し回数。
"""
# DB接続 & Ver情報取得と出力
try:
conn = psycopg2.connect(dsn)
cur = conn.cursor()
for num in range(loopCnt):
r = insertStr[num].split(',')
conn.commit()
except ConnectionError as err:
conn.rollback()
print(err + '洗浄登録に失敗しました。')
else:
print('洗浄登録が完了しました。')
finally:
cur.close()
conn.close()
return
def Insert_CleanData(readList, readList1):
# 文字データの作成
materialList_0 =
listCnt = 0
for cnt, list in enumerate(readList):
list2 = readList1[cnt]
if list[4] is None:
for cnt1 in range(int(list[1])):
dt1 = list2[0][0] + datetime.timedelta(minutes=(25 + cnt1))
materialList_0.append('999,' + str(cnt1 + 1) + ',0,' + str(dt1) + ',' + list[0] + str(cnt1 + 1).zfill(2))
materialList_0[listCnt] += ',26,' + str(list2[0][1]) + ',' + str(list2[0][2]) + ', 8,'
listCnt += 1
elif list[3] > int(list[1]):
print(list[0] + ' 個数異常')
else:
for cnt1 in range(list[3], int(list[1])):
dt1 = list[4] + datetime.timedelta(minutes=(25 + cnt1))
materialList_0.append('999,' + str(cnt1 + 1) + ',0,' + str(dt1) + ',' + list[0] + str(cnt1 + 1).zfill(2))
materialList_0[listCnt] += ',26,' + str(list2[0][1]) + ',' + str(list2[0][2]) + ', 8,'
listCnt += 1
#readListの返しが洗浄記録が無い場合も1個戻ってくる
# 年月日の文字列置き換え
for num in range(listCnt):
materialList_0[num] = replaceDate(materialList_0[num])
if listCnt > 0:
insertDB(materialList_0, num + 1)
else:
print("登録するデータが存在しません。")
return
flagtype = 1
while flagtype == 1:
inputDate = input('日付を入力して下さい')
while len(inputDate) != 8:
inputDate = input('日付を入力して下さい')
else:
inputDateEnd = input("終わりの日付を入力して下さい")
if len(inputDateEnd) == 8:
readList = readSurefaceData(inputDate, inputDateEnd)
readList1 = readBlockData(readList)
Insert_CleanData(readList, readList1)
else:
print("終了します。")
sys.exit