- add additional projecta reas for the automatic inclinometer import

- beginning with voegelsberg (ftp download, moving objects)
This commit is contained in:
Arno Kaimbacher 2022-03-23 16:30:17 +01:00
parent e068773eec
commit 7f08225b40
12 changed files with 1145 additions and 298 deletions

View file

@ -37,14 +37,14 @@ def main():
# sensor name in postgis db
# sensor = 'wolfsegg_kb1_0'
platform = 'laakirchen_inclinometer'
sensor_env_list = os.getenv('LAAKIRCHEN_SENSORS').replace('\n', '')
sensor_env_list = os.getenv('LAAKIRCHEN_SENSORS').replace('\n', '')
sensor_list = json.loads(sensor_env_list)
# print(sensor_list)
firebird_session: session = create_session()
firebird_session: session = create_session()
# this will print elements along with their index value
for sensor_id, sensor in enumerate(sensor_list):
# db_observation = session.query(Observation) \
# .filter_by(name='John Snow').first()
query_count = firebird_session.query(FbObservation).join(FbObservation.catena) \
@ -53,22 +53,24 @@ def main():
.filter(
or_(
FbObservation.temperature != None,
FbObservation.pitch != None #this is used to check NULL values
FbObservation.pitch != None # this is used to check NULL values
)) \
.count()
if query_count == 0:
print(f"sensor {sensor} "
f"doesn't have any observations with measured values in firebird database!")
# hop to next for iteration, next sensor in list
continue
# feature_of_interest = query.statement.compile(dialect=firebird.dialect())
query = firebird_session.query(FbObservation).join(FbObservation.catena) \
.filter(FbObservation.sensore == sensor_id) \
.filter(Catena.name == feature_of_interest)
# print (query.statement.compile(dialect=firebird.dialect()))
firebird_observations: List[FbObservation] = query.all()
# firebird_session.close()
# if query_count == 0:
# print(f"sensor {sensor} "
# f"doesn't have any observations with measured values in firebird database!")
# # hop to next for iteration, next sensor in list
# continue
# test = query_count.statement.compile(dialect=firebird.dialect())
firebird_observations: List[FbObservation] = []
if query_count > 0:
query = firebird_session.query(FbObservation).join(FbObservation.catena) \
.filter(FbObservation.sensore == sensor_id) \
.filter(Catena.name == feature_of_interest)
# print (query.statement.compile(dialect=firebird.dialect()))
firebird_observations: List[FbObservation] = query.all()
firebird_session.close()
pg_session: session = create_pg_session()
# pg_datasets: List[Dataset] = pg_query.all()
@ -77,20 +79,20 @@ def main():
.join(Phenomenon) \
.filter(Procedure.sta_identifier == sensor.lower())
# .join(Platform).all() \
roll_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "Roll").first()
Phenomenon.sta_identifier == "Roll").first()
slope_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "Slope").first()
temperature_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "InSystemTemperature").first()
platform_exists = pg_session.query(Platform.id).filter_by(
name=platform.lower()).scalar() is not None
if not platform_exists:
sensor_platform = Platform()
sensor_platform = Platform()
sensor_platform.sta_identifier = platform.lower()
sensor_platform.identifier = platform.lower()
sensor_platform.name = platform.lower()
@ -107,8 +109,7 @@ def main():
# commit dataset changes:
pg_session.commit()
format_exists: bool = pg_session.query(Format.id).filter_by(
definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"
).scalar() is not None
@ -121,12 +122,19 @@ def main():
roll_dataset.fk_format_id = sensor_format.id
temperature_dataset.fk_format_id = sensor_format.id
pg_session.commit()
create_db_observations(firebird_observations, roll_dataset, slope_dataset, temperature_dataset, pg_session)
if query_count == 0:
print(f"sensor {sensor} "
f"doesn't have any observations with measured values in firebird database!")
# hop to next for iteration, next sensor in list, don't insert any observations
continue
create_db_observations(firebird_observations, roll_dataset,
slope_dataset, temperature_dataset, pg_session)
# commit new observations:
pg_session.commit()
if len(roll_dataset.observations) > 0:
# if not published yet, publish the roll dataset
if not roll_dataset.is_published:
@ -135,7 +143,7 @@ def main():
roll_dataset.dataset_type = "timeseries"
roll_dataset.observation_type = "simple"
roll_dataset.value_type = "quantity"
if len(slope_dataset.observations) > 0:
# if not published yet, publish the roll dataset
if not slope_dataset.is_published:
@ -144,7 +152,7 @@ def main():
slope_dataset.dataset_type = "timeseries"
slope_dataset.observation_type = "simple"
slope_dataset.value_type = "quantity"
if len(temperature_dataset.observations) > 0:
# if not published yet, publish the temperature dataset
if not temperature_dataset.is_published:
@ -154,7 +162,6 @@ def main():
temperature_dataset.observation_type = "simple"
temperature_dataset.value_type = "quantity"
pg_session.commit()
last_roll_observation = pg_session.query(Observation) \
.filter(Observation.fk_dataset_id == roll_dataset.id) \
@ -213,80 +220,95 @@ def main():
# for loop sensors end
pg_session.close()
firebird_session.close()
# firebird_session.close()
def create_db_observations(firebird_observations: List[FbObservation],
roll_dataset: Dataset,
slope_dataset: Dataset,
temperature_dataset: Dataset,
pg_session: session):
roll_dataset: Dataset,
slope_dataset: Dataset,
temperature_dataset: Dataset,
pg_session: session):
''' insert new observations ito db '''
roll_result = (
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == roll_dataset.id)
.all()
)
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == roll_dataset.id)
.all()
)
roll_result_time_db_list1: List[str] = list(chain(*roll_result))
roll_result_time_db_list : List[float]= [time.mktime(date_obj.timetuple()) for date_obj in roll_result_time_db_list1]
roll_result_time_db_list: List[float] = [time.mktime(
date_obj.timetuple()) for date_obj in roll_result_time_db_list1]
slope_result = (
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == slope_dataset.id)
.all()
)
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == slope_dataset.id)
.all()
)
slope_result_time_db_list1: List[str] = list(chain(*slope_result))
slope_result_time_db_list : List[float]= [time.mktime(date_obj.timetuple()) for date_obj in slope_result_time_db_list1]
slope_result_time_db_list: List[float] = [time.mktime(
date_obj.timetuple()) for date_obj in slope_result_time_db_list1]
temperature_result = (
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == temperature_dataset.id)
.all()
)
temperature_result_time_db_list1: List[str] = list(chain(*temperature_result))
temperature_result_time_db_list : List[float]= [time.mktime(date_obj.timetuple()) for date_obj in temperature_result_time_db_list1]
pg_session.query(Observation.result_time)
.filter(Observation.fk_dataset_id == temperature_dataset.id)
.all()
)
temperature_result_time_db_list1: List[str] = list(
chain(*temperature_result))
temperature_result_time_db_list: List[float] = [time.mktime(
date_obj.timetuple()) for date_obj in temperature_result_time_db_list1]
for fb_observation in firebird_observations:
# print(fb_observation.catena.name)
if(fb_observation.roll is not None and roll_dataset is not None):
value = fb_observation.roll
add_observation(roll_dataset, fb_observation, value, roll_result_time_db_list)
add_observation(roll_dataset, fb_observation,
value, roll_result_time_db_list)
if(fb_observation.pitch is not None and slope_dataset is not None):
# max_id = max_id + 1
value = fb_observation.pitch
add_observation(slope_dataset, fb_observation, value, slope_result_time_db_list)
value = fb_observation.pitch
add_observation(slope_dataset, fb_observation,
value, slope_result_time_db_list)
if(fb_observation.temperature is not None and temperature_dataset is not None):
# max_id = max_id + 1
value = fb_observation.temperature
add_observation(temperature_dataset, fb_observation, value, temperature_result_time_db_list)
add_observation(temperature_dataset, fb_observation,
value, temperature_result_time_db_list)
def add_observation(dataset: Dataset, fb_observation: FbObservation, value: str, value_identifier_db_list: List[float]):
def add_observation(
dataset: Dataset,
fb_observation: FbObservation,
value: str,
value_identifier_db_list: List[float]):
''' check if observation still extists in db,
otherwise add it to fb'''
# ob_id: str = str(observation_json.get('id'))
# existing_observation: bool = (
# db_session.query(Observation)
# .filter(Observation.result_time == fb_observation.result_time, Observation.fk_dataset_id == dataset.id)
# .filter(Observation.result_time == fb_observation.result_time,
# Observation.fk_dataset_id == dataset.id)
# .one_or_none()
# )
existing_observation: bool =time.mktime(fb_observation.result_time.timetuple()) in value_identifier_db_list
# Can we insert this observation?
existing_observation: bool = time.mktime(
fb_observation.result_time.timetuple()) in value_identifier_db_list
# Can we insert this observation?
if existing_observation is False:
# insert bew observation
new_observation: Observation = Observation()
new_observation = Observation(
# id=max_id,
value_type='quantity',
sampling_time_start=fb_observation.result_time,
sampling_time_end=fb_observation.result_time,
result_time=fb_observation.result_time,
sta_identifier=str(uuid.uuid4()),
value_identifier = str(time.mktime(fb_observation.result_time.timetuple())),
value_quantity=value
)
# id=max_id,
value_type='quantity',
sampling_time_start=fb_observation.result_time,
sampling_time_end=fb_observation.result_time,
result_time=fb_observation.result_time,
sta_identifier=str(uuid.uuid4()),
value_identifier=str(time.mktime(
fb_observation.result_time.timetuple())),
value_quantity=value
)
dataset.observations.append(new_observation)
print(f"new observation with result time {new_observation.result_time} "
f"for inclinometer {dataset.procedure.name} succesfully imported!")
@ -294,6 +316,7 @@ def add_observation(dataset: Dataset, fb_observation: FbObservation, value: str,
print(f"observation with result time {fb_observation.result_time} "
f"for inclinometer {dataset.procedure.name} already exists!")
# -----------------------------------------------------------------------------
if __name__ == "__main__":
load_dotenv(find_dotenv())