Skip to main content

Striim Platform 5.0 documentation

HL7v2 Parser

HL7’s Version 2.x (V2) is a messaging standard for electronic data exchange in the clinical domain and one of the most widely implemented standard for healthcare in the world. This messaging standard allows the exchange of clinical data between systems. It is designed to support a central patient care system as well as a more distributed environment where data resides in departmental systems. As part of the Healthcare Data Initiative, the HL7v2 Parser is a new Striim source adapter parser to parse specifically HL7v2 messages. This parser supports data in HL7v2 format and HL7v2 format bundled using MLLP.

For more information on the standard, see HL7 V2 Product Suite.

This parser will generate XMLNodeEvents (a Striim-based event in the form of a DOM4J XML document).

Supported sources

The following sources are supported with the HL7v2 parser:

  • TCP Reader

  • Cloud based storage: S3 Reader, GCS Reader, and ADLS Reader

  • File based storage

  • Kafka Reader

After reading and parsing from a source, you can write to any target that supports the XMLNodeEvent (XML Formatter).

You can use a continuous query (CQ) to parse the XMLNodeEvent and retrieve the HL7v2 message content.

HL7v2 Parser properties

Property

type

default value

comments

MLLPDelimited

Boolean

False

This property specifies that data is shared in MLLP format. In TCP Reader you should enable this property if the communication is in MLLP format.

EnableMessageValidation

Boolean

False

HL7v2 message type requires certain segments and fields to be considered valid messages. Adding this boolean switch on the parser to enable default message structure/field validation. This is HL7v2 standard message validation.

Samples applications for HL7v2 parser

The following are sample applications for the HL7v2 parser:

CREATE OR REPLACE APPLICATION HL7v2;

CREATE OR REPLACE SOURCE MyGoogleHL7v2Data USING Global.GCSReader ( 
  BucketName: 'nmp-hl7v2-striim', 
  ConnectionRetryPolicy: 'retryInterval=30, maxRetries=3', 
  ServiceAccountKey: '1482-4daa73e14474.json', 
  PollingInterval: 5000, 
  ProjectId: 'nitin-infra-1482', 
  UseStreaming: false, 
  DownloadPolicy: 'DiskLimit=2048,FileLimit=10', 
  adapterName: 'GCSReader', 
  IncludeSubfolders: true, 
  ObjectDetectionMode: 'GCSDirectoryListing', 
   dontBlockOnEOF: 'true',
  ObjectFilter: '*.zip', 
  compressiontype: 'gzip' ) 
PARSE USING Global.HL7v2Parser ( 
  handler: 'com.webaction.proc.HL7v2Parser', 
  MLLPDelimited: false,
  DataValidation: false ) 
OUTPUT TO Hl7v2Out;

CREATE TARGET Sysout USING Global.SysOut ( 
  name: HL7v2XMLEvent ) 
INPUT FROM Hl7v2Out;

END APPLICATION HL7v2;

The following is a sample with a CQ and a BigQuery/Slack alert.

CREATE OR REPLACE APPLICATION HL7v2Demo;

CREATE OR REPLACE TYPE CacheFile (
 PAN java.lang.String,
 FirstName java.lang.String,
 LastName java.lang.String,
 Address java.lang.String,
 City java.lang.String,
 State java.lang.String,
 Zip java.lang.String,
 Gender java.lang.String);

CREATE SOURCE MLLP_HL7_Reader USING Global.TCPReader ( 
  blocksize: 64, 
  AckMLLP: true, 
  IPAddress: 'localhost', 
  portno: 10000, 
  maxconcurrentclients: 5 ) 
PARSE USING Global.HL7v2Parser ( 
  handler: 'com.webaction.proc.HL7v2Parser', 
  MLLPDelimited: true,
  DataValidation: false ) 
OUTPUT TO HL7_Stream;

CREATE OR REPLACE STREAM AlertStream OF Global.AlertEvent;

CREATE OR REPLACE CACHE PatientLookUp USING Global.FileReader ( 
  rolloverstyle: 'Default', 
  blocksize: 64, 
  skipbom: true, 
  wildcard: 'HL7PatientLookUp.csv', 
  directory: '/HOME/Healthcare/HL7v2/HL7', 
  includesubdirectories: false, 
  positionbyeof: false ) 
PARSE USING Global.DSVParser ( 
  blockascompleterecord: false,
  charset: 'UTF-8',
  columndelimiter: ',',
  columndelimittill: '-1',
  handler: 'com.webaction.proc.DSVParser_1_0',
  header: true,
  headerlineno: 0,
  ignoreemptycolumn: false,
  ignoremultiplerecordbegin: 'true',
  ignorerowdelimiterinquote: false,
  linenumber: '-1',
  nocolumndelimiter: false,
  quoteset: '\"',
  rowdelimiter: '\n',
  separator: ':',
  trimquote: true,
  trimwhitespace: false ) 
QUERY ( 
  keytomap: 'FirstName',
  skipinvalid: 'false' ) 
OF CacheFile;

CREATE OR REPLACE CQ FilterPatientNameCQ 
INSERT INTO PatientNameFilter_Stream 
SELECT 
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5").element("XPN.2").getText() as LastName,
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.PATIENT").element("PID").element("PID.5").element("XPN.1").element("FN.1").getText() as FirstName,
data as data
FROM HL7_Stream p

WHERE data.getName() = "ORU_R01";

CREATE OR REPLACE TARGET Slack_Alert USING Global.SlackAlertAdapter ( 
  adapterName: 'SlackAlertAdapter', 
  OauthToken: 'hK26UnbbLabc123JhkxvVcMaVgwLwfTEi8DCCKgYKfx8HtFuCYwc1R4nHosC3UcuYF8inSuso9abc123==', 
  ChannelName: 'striim-slack-alert-test' ) 
INPUT FROM AlertStream;

CREATE OR REPLACE CQ PatientFilterCQ 
INSERT INTO Patient_Hits_Stream 
SELECT p.FirstName as FirstName,
p.LastName as LastName,
data as data,
f.Address as Address,
f.City as City,
f.State as State
FROM PatientNameFilter_Stream p, PatientLookUp f
WHERE p.LastName = f.LastName and p.FirstName = f.FirstName;

CREATE CQ PositivePatientsCQ 
INSERT INTO PositivePatients_Stream 
SELECT * FROM Patient_Hits_Stream p
WHERE 
data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION").element("ORU_R01.OBSERVATION").element("OBX").element("OBX.5").element("CWE.5").getText() = "POSITIVE";

CREATE OR REPLACE CQ AlertEventCQ 
INSERT INTO AlertStream 
SELECT 
'Patient Found' as name,
'302' as keyVal,
'info' as severity,
'raise' as flag,
'Flagged patient `' + p.FirstName + ' ' + p.LastName + '` found to be POSITIVE for the following test: ' + data.element("ORU_R01.PATIENT_RESULT").element("ORU_R01.ORDER_OBSERVATION").element("ORU_R01.OBSERVATION").element("OBX").element("OBX.3").element("CE.2").getText() as message
FROM PositivePatients_Stream p;

CREATE CQ GetPatientInfo 
INSERT INTO BQColumnStream 
SELECT p.FirstName, 
p.LastName, 
p.Address,
p.City,
p.State
FROM PositivePatients_Stream p;

CREATE OR REPLACE TARGET BigQuery USING Global.BigQueryWriter ( 
  ColumnDelimiter: '|', 
  Tables: 'howardtest.Patient', 
  NullMarker: 'NULL', 
  streamingUpload: 'false', 
  Encoding: 'UTF-8', 
  ServiceAccountKey: 'Platform/UploadedFiles/admin/striimexample-john-doe.json', 
  ConnectionRetryPolicy: 'totalTimeout=600, initialRetryDelay=10, retryDelayMultiplier=2.0, maxRetryDelay=60 , maxAttempts=5, jittered=True, initialRpcTimeout=10, rpcTimeoutMultiplier=2.0, maxRpcTimeout=30', 
  projectId: 'striimexample', 
  AllowQuotedNewLines: 'false', 
  CDDLAction: 'Process', 
  optimizedMerge: 'false', 
  TransportOptions: 'connectionTimeout=300, readTimeout=120', 
  adapterName: 'BigQueryWriter', 
  Mode: 'APPENDONLY', 
  StandardSQL: 'true', 
  includeInsertId: 'true', 
  QuoteCharacter: '\"', 
  BatchPolicy: 'eventCount:1000000, Interval:30' ) 
INPUT FROM BQColumnStream;

END APPLICATION HL7v2Demo;