Using TimeStamps in NXCALS data
In this notebook I do a study on timestamps in NXCALS, demonstrating issues I encountered.
I found an inconcistency in the use of timestamp data, probably from incomplete typeseting of the variables, as the examples below demonstrate.
Initialization
from cern.nxcals.api.extraction.data.builders import *
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DoubleType
from pyspark.sql.types import ArrayType
import pyspark.sql.functions as func
from pyspark.sql.functions import pandas_udf, PandasUDFType
import time
import numpy as np
import pandas as pd
import os
from matplotlib import pyplot as plt
# sc - Spark Context
# spark - Spark Session in Memory
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark
<div>
<p><b>SparkSession - in-memory</b></p>
</div>
User functions
My functions to convert time from Unix timestamps to human readable format. I prefer to use pandas functions to be consistent in my analysis.
def tstamp2datetime(tstamp):
''' Convert NXCALS acqStamp (UNIX timestamp, UTC, ns) to human readable time format (UTC pandas datetime) '''
return pd.to_datetime(tstamp, unit='ns').tz_localize('UTC')
def datetime2tstamp(dattime):
''' Convert pandas datetime UTC to UNIX timestamp in nanoseconds '''
return pd.Timestamp(dattime).value
def _datetime2tstamp(dattime):
''' Convert pandas datetime UTC to UNIX timestamp in nanoseconds '''
return pd.Timestamp(dattime).to_datetime64().astype(int)
def getFillNo(t1:int, t2:int) -> pd.DataFrame:
''' Simple data query to get LHC fill numbers in a time window'''
ds = DataQuery.builder(spark).byVariables().system('CMW') \
.startTime(t1).endTime(t2) \
.variable('HX:FILLN').buildDataset()
ds.select('nxcals_timestamp','nxcals_value')
_tmpdf = ds.orderBy('nxcals_timestamp').toPandas()
# _tmpdf['testTime'] = _tmpdf['nxcals_timestamp'].apply(lambda x: pd.Timestamp(x, unit='ns').tz_localize('UTC'))
_tmpdf['testTime'] = _tmpdf['nxcals_timestamp'].apply(tstamp2datetime)
_tmpdf.rename(columns={'nxcals_value':'FillNo'},inplace=True)
_tmpdf.set_index('FillNo', inplace=True)
return _tmpdf
Timestamps consistency checks
Check that : - pandas timestamps can provide a [ns] resolution and, - convterting back/forth to datetime preserves the precision
its = 1535801592522000001
print (f' - Unix timestamp [ns] :', its)
print (f' - tstamp2datetime :', tstamp2datetime(its))
print (f' - datetime2tstamp :', datetime2tstamp(tstamp2datetime(its)))
- Unix timestamp [ns] : 1535801592522000001
- tstamp2datetime : 2018-09-01 11:33:12.522000001+00:00
- datetime2tstamp : 1535801592522000001
print ('Types : ')
print (f' datetime2tstamp = {type(datetime2tstamp(ts))}')
print (f' datetime2tstamp2 = {type(_datetime2tstamp(ts))}' )
Types :
datetime2tstamp = <class 'int'>
datetime2tstamp2 = <class 'numpy.int64'>
Timestamps in NXCALS data queries
The NXCALS data queries can accept input times in datetime non-timezone or Unix format.
It seems there is an inconsistency with the timestamp formats accepted in DataQueries and those returned in the panda DataFrames.
tt1 = '2018-09-01 10:12:00'
tt2 = '2018-10-01 10:12:00'
# -- using pandas Timestamp non-timezone
aadf = getFillNo(pd.Timestamp(tt1).tz_localize(None), pd.Timestamp(tt2).tz_localize(None))
aadf.head()
nxcals_entity_id | nxcals_timestamp | nxcals_variable_name | testTime | |
---|---|---|---|---|
FillNo | ||||
7113 | 1634062 | 1535801592522000000 | HX:FILLN | 2018-09-01 11:33:12.522000+00:00 |
7114 | 1634062 | 1535826483836000000 | HX:FILLN | 2018-09-01 18:28:03.836000+00:00 |
7115 | 1634062 | 1535835403041000000 | HX:FILLN | 2018-09-01 20:56:43.041000+00:00 |
7116 | 1634062 | 1535845289557000000 | HX:FILLN | 2018-09-01 23:41:29.557000+00:00 |
7117 | 1634062 | 1535851030659000000 | HX:FILLN | 2018-09-02 01:17:10.659000+00:00 |
# -- using Unix timestamp in int format
bbdf = getFillNo(datetime2tstamp(tt1), datetime2tstamp(tt2))
bbdf.head()
nxcals_entity_id | nxcals_timestamp | nxcals_variable_name | testTime | |
---|---|---|---|---|
FillNo | ||||
7113 | 1634062 | 1535801592522000000 | HX:FILLN | 2018-09-01 11:33:12.522000+00:00 |
7114 | 1634062 | 1535826483836000000 | HX:FILLN | 2018-09-01 18:28:03.836000+00:00 |
7115 | 1634062 | 1535835403041000000 | HX:FILLN | 2018-09-01 20:56:43.041000+00:00 |
7116 | 1634062 | 1535845289557000000 | HX:FILLN | 2018-09-01 23:41:29.557000+00:00 |
7117 | 1634062 | 1535851030659000000 | HX:FILLN | 2018-09-02 01:17:10.659000+00:00 |
However the timestamp values in the returned pandas DataFrame is in int64 type, which when fetched as variable is casted to numpy.int64.
aadf.dtypes
nxcals_entity_id int64
nxcals_timestamp int64
nxcals_variable_name object
testTime datetime64[ns, UTC]
dtype: object
t1 = aadf.loc[7114].nxcals_timestamp
t2 = aadf.loc[7117].nxcals_timestamp
print (f' -- types: {type(t1)}, {type(t2)}')
-- types: <class 'numpy.int64'>, <class 'numpy.int64'>
Using these values as input to a Data Query returns an error
# --- using numpy.int64 gives an error
getFillNo( t1, t2)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-197-be5696247b21> in <module>()
1 # --- using numpy.int64 gives an error
----> 2 getFillNo( t1, t2)
<ipython-input-186-ce8ade6eccaa> in getFillNo(t1, t2)
12
13 def getFillNo(t1:int, t2:int) -> pd.DataFrame:
---> 14 ds = DataQuery.builder(spark).byVariables().system('CMW') .startTime(t1).endTime(t2) .variable('HX:FILLN').buildDataset()
15 ds.select('nxcals_timestamp','nxcals_value')
16 _tmpdf = ds.orderBy('nxcals_timestamp').toPandas()
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/cern/nxcals/api/extraction/data/common.py in startTime(self, start_time)
53
54 def startTime(self, start_time):
---> 55 self._builder.startTime(nanos(start_time))
56 return EndStage(self._next_stage)
57
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/cern/nxcals/api/extraction/data/common.py in execute(*args)
11 def __getattr__(self, name):
12 def execute(*args):
---> 13 self.__java_builder = getattr(self.__java_builder, name)(*args)
14 return execute
15
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o3822.startTime.
: java.time.format.DateTimeParseException: Text '1535826483836000000' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851)
at java.time.LocalDateTime.parse(LocalDateTime.java:492)
at cern.nxcals.api.utils.TimeUtils.getInstantFromString(TimeUtils.java:93)
at cern.nxcals.api.extraction.data.builders.fluent.TimeStartStage.startTime(TimeStartStage.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
Re-Query data using as starting timestamp that of the acquired data converted to datetime. For some cases we miss the data!
j = 0
for i, row in aadf.iterrows():
print(f'>> Fill {i} : startTime={row.testTime}')
try :
try:
_auxtmp = getFillNo(row.testTime.tz_localize(None), (row.testTime+pd.Timedelta(1, 'h')).tz_localize(None))
except :
print('>>> error fetching data')
assert i in _auxtmp.index.values
# print (f' included - {_auxtmp.index.values}')
except:
print (f' <<< ERROR missing data >>> fill numbers returned :- {_auxtmp.index.values}')
j += 1
if j > 10:
break
>> Fill 7113 : startTime=2018-09-01 11:33:12.522000+00:00
>> Fill 7114 : startTime=2018-09-01 18:28:03.836000+00:00
>> Fill 7115 : startTime=2018-09-01 20:56:43.041000+00:00
>> Fill 7116 : startTime=2018-09-01 23:41:29.557000+00:00
>> Fill 7117 : startTime=2018-09-02 01:17:10.659000+00:00
<<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7118 : startTime=2018-09-02 07:09:40.929000+00:00
>> Fill 7119 : startTime=2018-09-02 18:11:48.417000+00:00
>> Fill 7120 : startTime=2018-09-02 19:39:05.835000+00:00
<<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7121 : startTime=2018-09-03 03:07:41.480000+00:00
>> Fill 7122 : startTime=2018-09-03 04:25:18.926000+00:00
<<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7123 : startTime=2018-09-03 13:06:32.089000+00:00
Using the UNIX timestamp directly works fine as expected!
j = 0
for i, row in aadf.iterrows():
print(f'>> Fill {i} : startTime={row.testTime}')
try :
try:
_auxtmp = getFillNo(row.nxcals_timestamp, row.nxcals_timestamp+pd.Timedelta(1, 'h').value)
except :
print('>>> error fetching data')
assert i in _auxtmp.index.values
# print (f' included - {_auxtmp.index.values}')
except:
print (f' <<< ERROR missing data >>> fill numbers returned :- {_auxtmp.index.values}')
j += 1
if j > 10:
break
>> Fill 7113 : startTime=2018-09-01 11:33:12.522000+00:00
>> Fill 7114 : startTime=2018-09-01 18:28:03.836000+00:00
>> Fill 7115 : startTime=2018-09-01 20:56:43.041000+00:00
>> Fill 7116 : startTime=2018-09-01 23:41:29.557000+00:00
>> Fill 7117 : startTime=2018-09-02 01:17:10.659000+00:00
>> Fill 7118 : startTime=2018-09-02 07:09:40.929000+00:00
>> Fill 7119 : startTime=2018-09-02 18:11:48.417000+00:00
>> Fill 7120 : startTime=2018-09-02 19:39:05.835000+00:00
>> Fill 7121 : startTime=2018-09-03 03:07:41.480000+00:00
>> Fill 7122 : startTime=2018-09-03 04:25:18.926000+00:00
>> Fill 7123 : startTime=2018-09-03 13:06:32.089000+00:00