Skip to content

Using TimeStamps in NXCALS data

In this notebook I do a study on timestamps in NXCALS, demonstrating issues I encountered.

I found an inconcistency in the use of timestamp data, probably from incomplete typeseting of the variables, as the examples below demonstrate.

Initialization

from cern.nxcals.api.extraction.data.builders import *
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import DoubleType
from pyspark.sql.types import ArrayType

import pyspark.sql.functions as func
from pyspark.sql.functions import pandas_udf, PandasUDFType

import time
import numpy as np
import pandas as pd 
import os
from matplotlib import pyplot as plt

# sc - Spark Context
# spark - Spark Session in Memory

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark
<div>
    <p><b>SparkSession - in-memory</b></p>

SparkContext

Spark UI

Version
v2.4.0
Master
yarn
AppName
pyspark_shell_swan
</div>

User functions

My functions to convert time from Unix timestamps to human readable format. I prefer to use pandas functions to be consistent in my analysis.

def tstamp2datetime(tstamp):
    ''' Convert NXCALS acqStamp (UNIX timestamp, UTC, ns) to human readable time format (UTC pandas datetime) '''
    return pd.to_datetime(tstamp, unit='ns').tz_localize('UTC')

def datetime2tstamp(dattime):
    ''' Convert pandas datetime UTC to UNIX timestamp in nanoseconds '''
    return pd.Timestamp(dattime).value

def _datetime2tstamp(dattime):
    ''' Convert pandas datetime UTC to UNIX timestamp in nanoseconds '''
    return pd.Timestamp(dattime).to_datetime64().astype(int)

def getFillNo(t1:int, t2:int) -> pd.DataFrame:
    ''' Simple data query to get LHC fill numbers in a time window'''
    ds = DataQuery.builder(spark).byVariables().system('CMW') \
                .startTime(t1).endTime(t2) \
                .variable('HX:FILLN').buildDataset()
    ds.select('nxcals_timestamp','nxcals_value')
    _tmpdf = ds.orderBy('nxcals_timestamp').toPandas()
    # _tmpdf['testTime'] = _tmpdf['nxcals_timestamp'].apply(lambda x: pd.Timestamp(x, unit='ns').tz_localize('UTC'))
    _tmpdf['testTime'] = _tmpdf['nxcals_timestamp'].apply(tstamp2datetime)
    _tmpdf.rename(columns={'nxcals_value':'FillNo'},inplace=True)
    _tmpdf.set_index('FillNo', inplace=True)
    return _tmpdf

Timestamps consistency checks

Check that : - pandas timestamps can provide a [ns] resolution and, - convterting back/forth to datetime preserves the precision

its = 1535801592522000001
print (f' - Unix timestamp  [ns] :', its)
print (f' - tstamp2datetime      :', tstamp2datetime(its))
print (f' - datetime2tstamp      :', datetime2tstamp(tstamp2datetime(its)))
 - Unix timestamp  [ns] : 1535801592522000001
 - tstamp2datetime      : 2018-09-01 11:33:12.522000001+00:00
 - datetime2tstamp      : 1535801592522000001
print ('Types : ')
print (f' datetime2tstamp  = {type(datetime2tstamp(ts))}')
print (f' datetime2tstamp2 = {type(_datetime2tstamp(ts))}' )
Types : 
 datetime2tstamp  = <class 'int'>
 datetime2tstamp2 = <class 'numpy.int64'>

Timestamps in NXCALS data queries

The NXCALS data queries can accept input times in datetime non-timezone or Unix format.

It seems there is an inconsistency with the timestamp formats accepted in DataQueries and those returned in the panda DataFrames.

tt1 = '2018-09-01 10:12:00'
tt2 = '2018-10-01 10:12:00'
# -- using pandas Timestamp non-timezone 
aadf = getFillNo(pd.Timestamp(tt1).tz_localize(None), pd.Timestamp(tt2).tz_localize(None))
aadf.head()
nxcals_entity_id nxcals_timestamp nxcals_variable_name testTime
FillNo
7113 1634062 1535801592522000000 HX:FILLN 2018-09-01 11:33:12.522000+00:00
7114 1634062 1535826483836000000 HX:FILLN 2018-09-01 18:28:03.836000+00:00
7115 1634062 1535835403041000000 HX:FILLN 2018-09-01 20:56:43.041000+00:00
7116 1634062 1535845289557000000 HX:FILLN 2018-09-01 23:41:29.557000+00:00
7117 1634062 1535851030659000000 HX:FILLN 2018-09-02 01:17:10.659000+00:00
# -- using Unix timestamp in int format
bbdf = getFillNo(datetime2tstamp(tt1), datetime2tstamp(tt2))
bbdf.head()
nxcals_entity_id nxcals_timestamp nxcals_variable_name testTime
FillNo
7113 1634062 1535801592522000000 HX:FILLN 2018-09-01 11:33:12.522000+00:00
7114 1634062 1535826483836000000 HX:FILLN 2018-09-01 18:28:03.836000+00:00
7115 1634062 1535835403041000000 HX:FILLN 2018-09-01 20:56:43.041000+00:00
7116 1634062 1535845289557000000 HX:FILLN 2018-09-01 23:41:29.557000+00:00
7117 1634062 1535851030659000000 HX:FILLN 2018-09-02 01:17:10.659000+00:00

However the timestamp values in the returned pandas DataFrame is in int64 type, which when fetched as variable is casted to numpy.int64.

aadf.dtypes
nxcals_entity_id                      int64
nxcals_timestamp                      int64
nxcals_variable_name                 object
testTime                datetime64[ns, UTC]
dtype: object
t1 = aadf.loc[7114].nxcals_timestamp
t2 = aadf.loc[7117].nxcals_timestamp
print (f' -- types: {type(t1)}, {type(t2)}')
 -- types: <class 'numpy.int64'>, <class 'numpy.int64'>

Using these values as input to a Data Query returns an error

# --- using numpy.int64 gives an error
getFillNo( t1, t2)
---------------------------------------------------------------------------

Py4JJavaError                             Traceback (most recent call last)

<ipython-input-197-be5696247b21> in <module>()
      1 # --- using numpy.int64 gives an error
----> 2 getFillNo( t1, t2)


<ipython-input-186-ce8ade6eccaa> in getFillNo(t1, t2)
     12 
     13 def getFillNo(t1:int, t2:int) -> pd.DataFrame:
---> 14     ds = DataQuery.builder(spark).byVariables().system('CMW')                 .startTime(t1).endTime(t2)                 .variable('HX:FILLN').buildDataset()
     15     ds.select('nxcals_timestamp','nxcals_value')
     16     _tmpdf = ds.orderBy('nxcals_timestamp').toPandas()


/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/cern/nxcals/api/extraction/data/common.py in startTime(self, start_time)
     53 
     54     def startTime(self, start_time):
---> 55         self._builder.startTime(nanos(start_time))
     56         return EndStage(self._next_stage)
     57


/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/cern/nxcals/api/extraction/data/common.py in execute(*args)
     11     def __getattr__(self, name):
     12         def execute(*args):
---> 13             self.__java_builder = getattr(self.__java_builder, name)(*args)
     14         return execute
     15


/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:


/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()


/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(


Py4JJavaError: An error occurred while calling o3822.startTime.
: java.time.format.DateTimeParseException: Text '1535826483836000000' could not be parsed at index 0
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1851)
    at java.time.LocalDateTime.parse(LocalDateTime.java:492)
    at cern.nxcals.api.utils.TimeUtils.getInstantFromString(TimeUtils.java:93)
    at cern.nxcals.api.extraction.data.builders.fluent.TimeStartStage.startTime(TimeStartStage.java:23)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)

Re-Query data using as starting timestamp that of the acquired data converted to datetime. For some cases we miss the data!

j = 0
for i, row in aadf.iterrows():
    print(f'>> Fill {i} : startTime={row.testTime}')
    try :
        try:
            _auxtmp = getFillNo(row.testTime.tz_localize(None), (row.testTime+pd.Timedelta(1, 'h')).tz_localize(None))
        except :
            print('>>> error fetching data')
        assert i in _auxtmp.index.values
        # print (f'      included - {_auxtmp.index.values}')
    except:
        print (f' <<< ERROR missing data >>> fill numbers returned :- {_auxtmp.index.values}')
    j += 1
    if j > 10:
        break
>> Fill 7113 : startTime=2018-09-01 11:33:12.522000+00:00
>> Fill 7114 : startTime=2018-09-01 18:28:03.836000+00:00
>> Fill 7115 : startTime=2018-09-01 20:56:43.041000+00:00
>> Fill 7116 : startTime=2018-09-01 23:41:29.557000+00:00
>> Fill 7117 : startTime=2018-09-02 01:17:10.659000+00:00
 <<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7118 : startTime=2018-09-02 07:09:40.929000+00:00
>> Fill 7119 : startTime=2018-09-02 18:11:48.417000+00:00
>> Fill 7120 : startTime=2018-09-02 19:39:05.835000+00:00
 <<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7121 : startTime=2018-09-03 03:07:41.480000+00:00
>> Fill 7122 : startTime=2018-09-03 04:25:18.926000+00:00
 <<< ERROR missing data >>> fill numbers returned :- []
>> Fill 7123 : startTime=2018-09-03 13:06:32.089000+00:00

Using the UNIX timestamp directly works fine as expected!

j = 0
for i, row in aadf.iterrows():
    print(f'>> Fill {i} : startTime={row.testTime}')
    try :
        try:
            _auxtmp = getFillNo(row.nxcals_timestamp, row.nxcals_timestamp+pd.Timedelta(1, 'h').value)
        except :
            print('>>> error fetching data')
        assert i in _auxtmp.index.values
        # print (f'      included - {_auxtmp.index.values}')
    except:
        print (f' <<< ERROR missing data >>> fill numbers returned :- {_auxtmp.index.values}')
    j += 1
    if j > 10:
        break
>> Fill 7113 : startTime=2018-09-01 11:33:12.522000+00:00
>> Fill 7114 : startTime=2018-09-01 18:28:03.836000+00:00
>> Fill 7115 : startTime=2018-09-01 20:56:43.041000+00:00
>> Fill 7116 : startTime=2018-09-01 23:41:29.557000+00:00
>> Fill 7117 : startTime=2018-09-02 01:17:10.659000+00:00
>> Fill 7118 : startTime=2018-09-02 07:09:40.929000+00:00
>> Fill 7119 : startTime=2018-09-02 18:11:48.417000+00:00
>> Fill 7120 : startTime=2018-09-02 19:39:05.835000+00:00
>> Fill 7121 : startTime=2018-09-03 03:07:41.480000+00:00
>> Fill 7122 : startTime=2018-09-03 04:25:18.926000+00:00
>> Fill 7123 : startTime=2018-09-03 13:06:32.089000+00:00