kafka cli

  1. kafka-topics

to list all existing topics

kafka-topics.sh –list –zookeeper 192.168.1.115:2181

or

kafka-topics.sh –zookeeper centos7:2181 –list
mynewtopic
mytesttopic

the list command can be given before or after the zookeeper details

to create a new topic

kafka-topics.sh –create –bootstrap-server kbrk1:9092 –replication-factor 3 –partitions 3 –topic javatest
Created topic javatest.

or

kafka-topics.sh –create –zookeeper centos7:2181 –replication-factor 3 –partitions 3 –topic javatest1
Created topic javatest1
.

i have three brokers so i cannot choose the replication factor to be be more than 3 in this case . also note you can either connect to zookeeper or one of the brokers and create the topic

to describe the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –describe
Topic: mynewtopic PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: mynewtopic Partition: 0 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1
Topic: mynewtopic Partition: 1 Leader: 4 Replicas: 4,2,1 Isr: 4,2,1
Topic: mynewtopic Partition: 2 Leader: 1 Replicas: 1,2,4 Isr: 1,2,4

lets try and interpret this , there are 3 brokers – broker id 1,2 and 4.

the partition 0 of mynewtopic has the leader in 2 and the replicas and in sync replicas are in 2 , 4 and 1. Notice 2 the leader is listed first.

partition 1 leader is 4 and partition 2 leader is 1 , so kafka has distributed the partitions across the 3 brokers.

to delete the topic , here is the command

kafka-topics.sh –zookeeper centos7:2181 –topic mynewtopic –delete
Topic mynewtopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

so basically the topic is marked for deletion and will not show up in the list command

2. kafka-console-producer

the next set of commands are deal with producers

to produce messages from the console , type in the command below and it will come back with the “>” prompt …this is where we can manually type in messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic

>first message

>second

>third

>and on and on  …

use ctrl-c to exit out of the prompt

lets now look at acks before we proceed with the next command. remember tcp syn acks etc , well this is something similar . When the producer writes a message to the broker , we can define if the producer need to receive acknowledgement back from the leader , from every in sync replica or not wait at all for either leader or followers . not waiting for ack would be the fastest , waiting just for the leader would gurantee atleast the leader got it and if producer waits for all the replicas to receive the update , then its going to be the slowest

acks=0,1,-1 -> no wait , wait for leader, wait for all

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic –producer-property acks=all

producing to topic that has not been created will create a new topic with that name with the default settings

3.kafka-console-consumer

the previous command was to do with the producer , the new commands are on the receiving or consuming side

publish to a non existing topic

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic sjvztopic –producer-property acks=all

test
[2020-07-10 17:41:41,251] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {sjvztopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sec
thir
4th
5th

using the from beginning option will list everything ( try this on another console)

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic -from-beginning
test
sec
thir
4th
5th

without the beginning it just prints whatever messages showed up after the consumer was configured

kafka-console-consumer.sh –bootstrap-server kbrk1:9092 –topic sjvztopic
4th
5th

if we give an addional parameter -group the console consumer becomes part of a consumer group. the offsets are tracked by the consumer group , so the from -beginning doesnot have an effect since the consumer group has already seen the messages so only the new messages will show up.

3. kafka-consumers-group.sh

you can use this to list , describe , reset offsets , shift offset for consumer groups

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –list
newgroup

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –describe –group newgroup

Consumer group ‘newgroup’ has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
newgroup sjvztopic 0 162 162 0 – – –

kafka-consumer-groups.sh –bootstrap-server kbrk2:9092 –group newgroup –reset-offsets –shift-by -2 –execute –topic sjvztopic

GROUP TOPIC PARTITION NEW-OFFSET
newgroup sjvztopic 0 160

see how the offset moved back by 2 from 162 to 160

kafka

this blog documents my home lab set up with kafka

See commands below for installing zookeeper

once zookeeper is installed , i installed kafka brokers

once i configure one kafka broker , i can copy the config and build the rest

install java and kafka binaries, kafka comes with zookeeper

yum install java-1.8.0-openjdk
wget
mkdir /kafka
cd /kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
ll
export PATH=$PATH:/kafka/kafka_2.12-2.5.0/bin
mkdir /zookeper_data
pwd
cd kafka_2.12-2.5.0/
cd config
pwd
ls zookeeper.properties
cat zookeper.properties


zookeeper-server-start.sh zookeeper.properties

useradd zook -m
usermod –shell /bin/bash zook
passwd zook

chown zook:zook /zookeeper_data/

starting the zookeeper from console gives me this output

see the line that states info binding to port 0.0.0.0:2181 – this indicates that zookeeper is running

drwxr-xr-x. 2 root root 4096 Apr 7 21:13 windows
-rwxr-xr-x. 1 root root 867 Apr 7 21:13 zookeeper-security-migration.sh
-rwxr-xr-x. 1 root root 1393 Apr 7 21:13 zookeeper-server-start.sh
-rwxr-xr-x. 1 root root 1001 Apr 7 21:13 zookeeper-server-stop.sh
-rwxr-xr-x. 1 root root 1017 Apr 7 21:13 zookeeper-shell.sh

in systemd i can specify the start and stop scripts

here is how the systemd service file is configured

cat /etc/systemd/system/zookeeper.service
[unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=zook
Group=zook
ExecStart=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /kafka/kafka_2.12-2.5.0/config/zookeeper.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target

status command gives the following output

[root@centos7 bin]# systemctl status zookeeper
● zookeeper.service
Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 13:43:11 EDT; 10min ago
Process: 8552 ExecStop=/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh (code=exited, status=1/FAILURE)
Main PID: 9866 (java)
CGroup: /system.slice/zookeeper.service
└─9866 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+Exp

i have 3 brokers these are kbrk1 , kbrk2, kbrk4

the broker id would be 1,2, and 4 respectively. 1 and 2 are in rack 1 and 4 is in rack 2

the log_dirs points to /home/kafka_data

this is the base directory where kafka broker stores the partition replica

kafka internally creates a topic for the offset itself , previously zookeeper used to track the offsets , but now its stored in kafka as a topic by itself . The parameter offsets.topic.num.partitions decide on how many partitions are used to store this. The default value is 50 and may be too high for test , we will store ours as 3

Default replication factor for the offset topic is 3

the minimum insync replica is 2 . default replication factor is 2 – this is used where topics are automatically create and the replication factor is not specified.

these can go in the server.properties file

offsets.topic.num.partitions=3
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=2
default.replication.factor=2

the zookeeper.connect should point to system where the zookeper is running.

all of these values are in the server.properties file

when i first started the kafka server i got this

1.115:2181: No route to host (org.apache.zookeeper.ClientCnxn)
^C[2020-07-09 15:47:38,926] INFO Terminating process due to signal SIGINT (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-07-09 15:47:38,937] INFO shutting down (kafka.server.KafkaServer)

this is with no route to host .

i used ncat to check

nc 192.168.1.115 2181
Ncat: No route to host.

since ncat gives the same error as i am seeing inside the host , this is a network issue

i disabled the firewall on zookeeper and ncat was able to connect

follow this direction

https://progressive-code.com/post/17/Setup-a-Kafka-cluster-with-3-nodes-on-CentOS-7

as far as ports go

kafka default ports:

  • 9092, can be changed on server.properties;

zookeeper default ports:

  • 2181 for client connections;
  • 2888 for follower(other zookeeper nodes) connections;
  • 3888 for inter nodes connections;

make sure to actually use the name that the service is configured with

firewall-cmd –permanent –add-service=ZooKeeper
Error: INVALID_SERVICE: ‘ZooKeeper’ not among existing services

ensure the service name matches exactly with the service – i.e its case sensitive ..see same command with the case corrected.

firewall-cmd –permanent –add-service=zookeeper
success

a restart of the firewalld is required after this change

sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service

restarted the broker and this time it did start

[2020-07-09 16:56:35,447] INFO [KafkaServer id=4] started (kafka.server.KafkaServer)

the id =4 is what i assigned to this particular broker

i need to create a service – /etc/systemd/system/kafka.service

[root@kbrk4 config]# cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafkabg
Group=kafkabg
ExecStart=/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh /kafka/kafka_2.12-2.5.0/config/server.properties
ExecStop=/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

systemctl start kafka.service

systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 17:30:04 EDT; 9s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 29107 (java)
CGroup: /system.slice/kafka.service
└─29107 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…


Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Jul 09 17:30:08 kbrk4 kafka-server-start.sh[29107]: [2020-07-09 17:30:08,…
Hint: Some lines were ellipsized, use -l to show in full.

cat kafka.xml


[root@kbrk4 services]# ls -al
total 4
drwxr-x—. 2 root root 23 Jul 9 17:54 .
drwxr-x—. 7 root root 133 Jan 1 2018 ..
-rw-r–r–. 1 root root 178 Jul 9 17:54 kafka.xml


sudo service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
# firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-services
ssh dhcpv6-client kafka
[root@kbrk4 services]#

as you can see kafka is listed in the services

as of now , we have updated

  1. Server properties – for the broker configuration
  2. kafka.xml for the firewall configuration
  3. kafka.service for setting up systemctl

we can now copy these files to other broker nodes and all we need to do is to change the broker id

once the files are copied i can start the service and also make sure the firewall rules are in place , and i need to make sure the users are created to start the service

useradd kafkabg -m
mkdir /home/kafka_data
passwd kafkabg
chown -R kafkabg:kafkabg /home/kafka_data/

add firewall

firewall-cmd –list-service
ssh dhcpv6-client
firewall-cmd –permanent –add-service=kafka
success
service firewalld restart
Redirecting to /bin/systemctl restart firewalld.service
firewall-cmd –list-service
ssh dhcpv6-client kafka

start kafka on the brokers

systemctl start kafka.service
systemctl status kafka.service
● kafka.service – Apache Kafka server (broker)
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2020-07-09 20:03:44 EDT; 7s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 11940 (java)
CGroup: /system.slice/kafka.service
└─11940 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPause…

Jul 09 20:03:47 kbrk2 kafka-server-start.sh[11940]: [2020-07-09 20:03:47,…

we now have kafka -> zookeper and brokers running !

test connection to the cluster from the zookeeper

]# zookeeper-shell.sh 192.168.1.115 ls /brokers/ids
Connecting to 192.168.1.115

WATCHER::

atchedEvent state:SyncConnected type:None path:null
[1, 2, 4]

use the kafka-topics.sh to create a topic

kafka-topics.sh –create –zookeeper 192.168.1.115:2181 –replication-factor 3 –partitions 3 –topic mytesttopic
Created topic mytesttopic.

we can list the topics with the command below

kafka-topics.sh –list –zookeeper 192.168.1.115:2181
mytesttopic

as you can see it comes back with the topic we created with the previous command

we will use the kafka-console-producer script to produce some messages

kafka-console-producer.sh –broker-list 192.168.1.105:9092 –topic mytesttopic

Azure Stream analytics – window functions

Lets go over what Azure Stream Analytics is and then we will look at specifically Window functions, statistical functions , and scaling functions in Azure Stream analytics

  • Azure stream analytics can essentially
    • intake millions of events per second t variable loads
    • perform real time analytics on continuous streams of data
    • connect with Event hub for stream ingestion , and Azure blob for historical service
    • output to power BI as an output within Azure Stream analytics

Basically Azure Stream analytics can take inputs from Event Hubs, Iot Hubs or Blob storage and process it with SQL based query and then push it to SQL server, Power BI, Data Lake Storage, Cosmos DB, Service Bus , Synapse , function etc

Steps to configure

Set up a stream analytics job – this gives you a few options – Hosting environment – this can be cloud or Edge – You can use edge only if you are deploying it on an on-premise IOT gateway edge device and the other option is Streaming units which is an abstraction of the computation resources available to process the query . You can choose to store all the data directly into a data lake if we select the secure all private data assets in the storage account

there is a section where you can write the SQL query . a subset of t-sql is supported in Azure Stream analytics

lets take a look at the window function in Azure Stream analytics

Window functions can be used in the group by section of the sql query. The simplest of these window functions is the Tumbling window . If you want an average of all the temp recordings over a window of 10 seconds , then you essentially defined a window thats based on a time window . The window ends when the time ends , so essentially there is no overlap , an event can only be in one tumbling window.

if however you want to see moving average , lets say the moving average of price of security over 10 seconds with a hop of 2 seconds , then the window slides 2 seconds , but the new 10 second window is essentially the last 8 seconds and the next 2 seconds . This is the Hopping window . The tumbling window is essentially the hopping window where the hop size is the same size as the window size.

The sliding window is tricky to understand , lets say you have an event stream that has a variable speed, so instead of hopping every 2 seconds like the hopping window , what if the hop is based on when the event happens. take a look at this example

https://www.oreilly.com/library/view/stream-analytics-with/9781788395908/87d7eea1-cf76-42a9-91ed-b68d9364febf.xhtml

all of these windows that we have seen so far has all been fixed time length window.

A session window instead is based on grouping events together if they happen within the timeout window specified. if the timeout exceeds , the window is closed and new window is opened. if we need the window to be grouped based on keys , then events are grouped by key and session window is applied to each group independently.

on a final note , if we want to calculate the moving average every 10 seconds as well as every 30 seconds and every 60 seconds , you can use the Windows() function to apply multiple windows to the same stream. The windows function accepts an ID as the identity of the window definition and then results can be grouped based on this id.

Polybase

This article will go over the steps to load data into Azure SQL DW with polybase

We will first start off with what is polybase and then get into the details on how to use it

polybase is Microsoft’s solution to getting SQL server and Hadoop to be friends and have a jolly good time … I know this definition is deeply technical …( you are welcome ! )

polybase will allow us to run SQL queries on data stored in Hadoop, so if you have some data in SQL and you want to combine this with data that is in HDFS , Azure Blob storage , Azure Data Lake etc and give you a single interface to run these queries. polybase allows these external sources to be used from the Sql server environment.

Polybase is also microsofts recommended way of loading data from Azure Data Lake to Azure SQL warehouse. The ability to send projection down to the underlying Hadoops distributed architecture as well as the ability to scale out gives us the opportunity to optimize our load times

ok now that we have covered what polybase , lets see how we can use it to load data into data warehouse.

First steps first

Lets get a sql login created and a corresponding sql user created . just to go over the basic quickly – a sql login allows you to connect to the SQL server instance and then users are granted the permissions to the databases hosted on that sql server.

Here are the command to create a login and associated user

CREATE LOGIN Loadersjvzlogin  WITH PASSWORD = ‘a123STRONGpassword!’;

CREATE USER Loadersjvzuser FOR LOGIN Loadersjvzlogin;

We now need to grant control to the DW for this particular user

GRANT CONTROL ON DATABASE:: [sjvzdwpool] to Loadersjvzuser;

We then need to add the user to an appropriate resource class.

EXEC sp_addrolemember ‘staticrc60’, ‘Loadersjvzuser’;

so what is a resource class – glad you asked – resource classes are used to manage memory and concurrency for Synapse SQL pool queries in Azure synapse – higher the resources , less concurrency so you really want to balance and distribute the users amongst these different resource classes.

its always a good practice to create a separate user for the loader and assign a static resource class to the loader. CREATE TABLE uses clustered columnstore indexes by default. Compressing data into a columnstore index is a memory-intensive operation, and memory pressure can reduce the index quality. Memory pressure can lead to needing a higher resource class when loading data. To ensure loads have enough memory, you can create a user that is designated for running loads and assign that user to a higher resource class.

Polybase does allow external data to be loaded into on-prem SQL or Azure SQL Datawarehouse but Azure SQL is not supported as of this time

the next step is to create a Master Key

CREATE MASTER KEY WITH ENCRYPTION BY PASSWORD = ‘mypwd’

The database master key is a symmetric key used to protect the private keys of certificates and asymmetric keys that are present in the database. When it is created, the master key is encrypted by using the AES_256 algorithm and a user-supplied password.

one way to check the keys is to run the command below

select * from sys.symmetric_keys

if you are trying all this on your local desktop , you may want to install Polybase feature on your laptop

Machine generated alternative text:
SQL Server 2019 Setup 
PolyBase Configuration 
Specify Poly3ase scale-out option and port range. 
Global Rules 
Microsoft Update 
Product Updates 
Install Setup Files 
Install Rules 
Installation Type 
Feature Selection 
Feature Rules 
Poly Base Configuration 
Java Install Location 
Server Configuration 
Analysis Services Configuration 
Integration Services Scale Out 
Integration Services Scale Out 
Consent to install Microsoft R 
Consent to install Python 
Feature Configuration Rules 
Ready to Install 
Installation Progress 
use this SQL Server as standalone Poly3ase-enabIed instance. 
Choose this option to use this SQL Server instance as a standalone Head node. 
use this SQL Sewer as a pat of Poly3ase scale-out group. 
Choose this option to use this SQL Server instance as a Compute node in a Poly8ase Scale-out 
group. To ensure that pur PolyBase scale-out group can be configured after installation, make sure 
that the head node is on enterprise license of SQL Server 201 g. Selecting this option will open 
Firewall on this machine to allow incoming connections to SQL Server Database Engine, SQL Server 
Poly3ase services and SQL Browser. Selecting this option will also enable MSDTC firewall 
connections and modify MSDTC registry settings. 
Specify a port range for Poly3ase services (6 or more ports): 
16430 16460

Now we are ready for steps that are specific to mount the externals

Here are the high level steps

  1. Create a Database scoped credential with the storage account key

CREATE DATABASE SCOPED CREDENTIAL ADLS_credential

WITH — IDENTITY = ‘<storage_account_name>’ ,

— SECRET = ‘<storage_account_key>’ ;

2. Create an External DataSource

Create External File format

CREATE EXTERNAL FILE FORMAT parquetfileformat 

WITH ( 

    FORMAT_TYPE = PARQUET, 

      DATA_COMPRESSION = ‘org.apache.hadoop.io.compress.SnappyCodec’ 

  );

Create Schema

CREATE SCHEMA  twh;

Create External Table

CREATE EXTERNAL TABLE [exttravel].[itineraries]

(

[session_key] [nvarchar](75) NOT NULL,

[outbound_leg_id] [nvarchar](75) NOT NULL,

[inbound_leg_id] [nvarchar](75) NOT NULL

)

WITH (DATA_SOURCE = [traveldataadlssrc],LOCATION = N’2020/06/20/17/loaditineraries’,FILE_FORMAT = [parquetfileformat],REJECT_TYPE = VALUE,REJECT_VALUE = 0)

GO

and finally CTAS – which is essentially CREATE TABLE AS SELECT

CREATE TABLE twh.loaditineraries
WITH
(
DISTRIBUTION = REPLICATE,
CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT * FROM exttravel.itineraries
OPTION (LABEL = ‘polybaseloadfromiteneraries’) ;

we are done creating our first load , if you see in the image below , we have created a new table in sql warehouse thats based on the external storage

Dynamic Data masking in Azure synapse

This feature helps us set masking rules so sensitive data can be masked with a bunch of XXXX’s , so that column level security does not force you to change the schema

the unfortunate thing is that we don’t get to set it using the Azure portal for Azure synapse . We will need to use the REST API or the CLI for the same

Lets look at the rules before we look at the CLI – see pic below to see the options we get to Data Masking . in the case below , the random number range is greyed out because the column selected is not numeric.

Masking rules and functions

Masking functionMasking logic
DefaultFull masking according to the data types of the designated fields

• Use XXXX or fewer Xs if the size of the field is less than 4 characters for string data types (nchar, ntext, nvarchar).
• Use a zero value for numeric data types (bigint, bit, decimal, int, money, numeric, smallint, smallmoney, tinyint, float, real).
• Use 01-01-1900 for date/time data types (date, datetime2, datetime, datetimeoffset, smalldatetime, time).
• For SQL variant, the default value of the current type is used.
• For XML the document <masked/> is used.
• Use an empty value for special data types (timestamp table, hierarchyid, GUID, binary, image, varbinary spatial types).
Credit cardMasking method, which exposes the last four digits of the designated fields and adds a constant string as a prefix in the form of a credit card.

XXXX-XXXX-XXXX-1234
EmailMasking method, which exposes the first letter and replaces the domain with XXX.com using a constant string prefix in the form of an email address.

aXX@XXXX.com
Random numberMasking method, which generates a random number according to the selected boundaries and actual data types. If the designated boundaries are equal, then the masking function is a constant number.

Navigation pane
Custom textMasking method, which exposes the first and last characters and adds a custom padding string in the middle. If the original string is shorter than the exposed prefix and suffix, only the padding string is used.
prefix[padding]suffix

Navigation pane

now lets look at how to set this on Azure synapse using CLI

  • we will use power shell to connect
  • microsoft has provided some Azure cmdlets in power shell
  • you need to install the module az first , open up power shell ISE and enter the command below
  • Install-Module -Name Az -AllowClobber -Scope CurrentUser
  • run these commands below

PS C:\WINDOWS\system32> Connect-AzAccount

Account SubscriptionName TenantId
——- —————- ——–
xxxx@outlook.com Visual Studio Enterprise 55xxxx

PS C:\WINDOWS\system32> Get-AzSqlDatabaseDataMaskingPolicy -ResourceGroupName “sjvzdw” -ServerName “sjvzdwsrvr” -DatabaseName “sjvzdwpool”

DatabaseName : sjvzdwpool
ResourceGroupName : sjvzdw
ServerName : sjvzdwsrvr
DataMaskingState : Disabled
PrivilegedUsers :

  • As you can see the DataMaskingState is Disabled
  • Now run this command to create a DataMasking rule
  • New-AzSqlDatabaseDataMaskingRule -ResourceGroupName “sjvzdw” -ServerName “sjvzdwsrvr” -DatabaseName “sjvzdwpool” -SchemaName “twh” -TableName “loaditineraries” -ColumnName “session_key” -MaskingFunction “Default”

  • The following masking function values are allowed
    • NoMasking
    • Default
    • Text
    • Number
    • SocialSecurityNumber
    • CreditCardNumber
    • Email

if you rerun the Get command again , you will see that the DataMasking State has been enabled

References –

https://docs.microsoft.com/en-us/azure/azure-sql/database/dynamic-data-masking-overview?toc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Ftoc.json&bc=%2Fazure%2Fsynapse-analytics%2Fsql-data-warehouse%2Fbreadcrumb%2Ftoc.json

https://docs.microsoft.com/en-us/powershell/module/az.sql/new-azsqldatabasedatamaskingrule?view=azps-4.3.0

scala intro

Just some random notes about scala that you would notice as you start learning scala ( this is fairly long post )

scala support first class functions – this means scala treats functions just like variables – you can pass it to another function as an argument

it supports closures – A closure is a function, whose return value depends on the value of one or more variables declared outside this function

it supports currying – which is where a funtion that takes multiple arguements can be written as if it takes one argument and then a chain of calls to bring the same functionality as earlier , this makes it easier to pass the function as a first class function.

it supports pattern matching – case classes

Scala has a strong type inference , so its statically typed but we can leverage the type inference

scala> var radi = 10

radi: Int = 10

scala> var raii = 10.0

raii: Double = 10.0

in the case below , scala inferred the types based on the values . The compiler is smart enough to check the type

scala> var radi:Int = 10.0

                      ^

       error: type mismatch;

        found   : Double(10.0)

        required: Int

use triple quotes “””” for multiline strings. “””

you can use == operator to compare strings

String interpolation is supported , so scala will substitute the variable in the string thats delimited by $. need a ‘s’ at the beginning of the string

s” hello $valword”

the $valword will be replaced by whatever value it has . you can use formula with {} . for printf , precede the string with a ‘f’ and then the format with the % command

scala has a unified type system, java has the value and reference type , primitives like integer etc are passed by value whereas String , collections etc are object and passed by reference.

everything is an object in scala – no primitives , no wrapper types etc – functions are objects too – first class function

Scala solves this conundrum by declaring superclass – two superclass Anyval and AnyRef and then these have another class called Any …see this for better idea –https://docs.scala-lang.org/resources/images/tour/unified-types-diagram.svg

null in java and scala are the same , it can be used as a value for a reference type but not for value type . Null is a trait ie a type and not a value , null is a type of Null. Nothing is also a trait . Nothing can never be instantiated but Nothing extends everything because its a subtype of both AnyVal and AnyRef. Nil is a special value associated with an empty list. Nil is the singleton instance of List[Nothing]. List are singly linked list in scala and Nil is what the last element points to. None is a special value associated with an option , so if the function does return a value sometimes…. great , if not it can return None. Unit is similar to void in java as it is the return type of a function that returns nothing for instance

type operations -> asInstanceOf – similar to cast in java , however its better to use to<Type> e.g toLong – its class specific , and calls the system converter and its better .

you can use boolean method isInstanceOf[class] to check the type . you can use getClass to get the actual class

scala can be compiled or interpreted (remember the repl loop in intellij )

You can directly create a scala object as opposed to a class

A scala object corresponds to a static class in java

So  object o creates a singleton object 0 as the instance of some anonymous class , it can be used to hold static members that are not associated with instances of some class

main is the entry point just like java , but the same can be achieved by extending App which is a trait.

Object o extends T  makes the object 0 an instance of Trait T ;  you can then pass o anywhere , a T is expected.

Traits in Scala are like partially implemented interfaces. It may contain abstract and non-abstract methods. It may be that all methods are abstract, but it should have at least one abstract method. Not only are they similar to Java interfaces, Scala compiles them into those with corresponding implementation classes holding any methods implemented in the traits.

You can say that using Scala trait, we can share interfaces and fields between classes. Within trait, we can declare variables and values. When we do not initialize them, they are abstract. In other cases, the implementing class for the trait internally implements them.

Unlike Java , Traits can be extended by an instance of the class and not just by the class itself. you can extend multiple traits thus giving the impression of multiple inheritance

val is immutable , this keeps the code safe for concurrent distributed applications. Var is mutable , better to use Val instead .

Don’t use the return keyword in scala  ,  the last value is the return value in scala code  , so it automatically picks it up  …using return will allow you to compile the code , but its not advised.

Here is a good blog describing why we should not use return

https://tpolecat.github.io/2014/05/09/return.html

in scala – you either have a statement ( which does not return anything ) or expression ( which does return something ). A lot of stuff in java thats considered statement in java is considered as expression in scala . You can chain multiple expressions in an expression block thats enclosed in curly brackets – {} . The return type is whats the last expression returns within an expression block.

Scala does allow access to variables defined outside of the expression block ,this is what enables closure. You can have nested expression blocks and in cases of a var defined with the same name and runs into conflict the one closest in the scope is picked , this applies to return – the one that’s most inside defines the return type of a nested expression block

if/else is an expression in scala and if there is no else ( pun intended ) then the return value is Nothing , this is why type inference could infer the value to be Any.

match expressions are more common in scala than nested if else or switch-case in java, this is because match expression can match on value , type etc . match with case for each condition gives a way to conditionally initialize a val.

you can use OR ( syntax is the pipe symbol – ” | ” ) or patternGurad thats a if expression inside of a case expression , so you could have the same Case statement repeated with a different if expression .

val typeofDay = dayofWeek match{
    case "Monday" | "Tuesday" => "Working Day"
    case "Saturday"  => "Sabbath"
   }

in java you have default to do the catch everything else, Scala expands upon this option by giving us two options , you can either declare another case stmt with another variable that catches the default value that can be used in the expression or you can use the default wild card operator “_”

case someothervalue => s"This $somethervalue could be sunday or other working day"

or 

case _ => s"This $dayoftheweek could be sunday or other day"

notice i cannot use the $_ to refer to the variable in the expression

a for loop s just like java , it just iterates through the list so its a statement , recollect statements are code that does not return anything, if you add a “yield” statement , then scala considers the for loop to be an expression which then returns a value for each of the iteration

for(item <- iteminlist){
        item match {
             case "Mango" =>println("Fruit")
             case  everythingelse => println("it could be a fruit orsomething else")
}

// the above stmt doesnot return anything it just prints - the stmt below // returns a list

val new list = for(item <- iteminlist) yield 
{
item match {
case "Mango" => "Fruit"
case everythingelse => "it could be a fruit or something else"
}

this will return a list 
List(Fruit, ....)

in for loop you can use to for the index and it includes the last number in the range or until and it does not include the last number in the range.

while and do while statements are not preferred in scala , its a statement not an expr , the incremental variable has to be a var so it can b incremented and it doesnt fit the functional programming paradigm.

functions are first class citizens which means its an object in itself so it can be passed to other methods , function , store it in a val , return a function etc …all these things you cannot do with a method. methods are declared inside a class . Here is how you would declare the two

class test { 
      def m1(x:Int) = x+3 
      val f1 = (x:Int) => x+3
 }

methods have a “def” and then the name and then an = sign and optionally curly brackets for the expression block.

you can easily convert scala method to function …there are two ways one is by specifying the method signature and the other by wildcard “_” appended to the method ..this is called as eta and scala compiler understands and converts the method to function.

 we can convert method m2 to function f2 as follows 

def m2(x:Int):Int = { x+ 3}
m2: (x: Int)Int
scala> val f1 = (x:Int)=> {x + 3.0}:Double
f1: Int => Double = $$Lambda$840/0x0000000801068840@26bb92e2
scala> val f2:(Int) => Int = m2
f2: Int => Int = $$Lambda$842/0x000000080106a040@77a2688d

you see both function f1 and f2 are of the type Lambda , prior version of scala would make it of type function1 or functionn where n = 1..23  

the other more common way is the eta expansion 

scala> val f3 = m2 _
f3: Int => Int = $$Lambda$843/0x000000080106a840@9499643

note the right side is the name of the method -m2 , space character and then " _"  wild card character 

with named method parameters , the method can be invoked out of order , since we are referring to parameters by name , see below

scala> def m3(num:Int, dem:Int):Float = {num/dem}
m3: (num: Int, dem: Int)Float
scala> m3(2,3)
res15: Float = 0.0
scala> m3(3,2)
res16: Float = 1.0
scala> m3(dem=2,num=3)
res17: Float = 1.0
scala>
in this case i am passing out of order , but scala knows because of named parameter ,  this works only with methods not with function

you can define a default value for a parameter in the method , so you dont have to pass the parameter. Java does not support default parameter and you have to use function overloading in java to get that same feature

parametric polymorphism is same as generics in java with Type parameter. Type parameters are passed in square brackets [] unlike method parameters which are sent in round brackets ()

type parameters only work with methods and not functions ..so if you convert such methods to functions with the eta expansion , it loses the type and converts everything to Any object so we have essentially lost type safety. The key here is to

scala> def printpairTypes[K,V](k:K,v:V) = {
     | val keytype = k.getClass
     | val valtype = v.getClass
     | println(s"$k, $v are of types $keytype $valtype")
     | }
printpairTypes: [K, V](k: K, v: V)Unit

scala> printpairTypes(10,"ten")
10, ten are of types class java.lang.Integer class java.lang.String

val printfn = printpairTypes _
printfn: (Any, Any) => Unit = $$Lambda$1020/0x000000080111d840@70e75151

converting to function automatically converts to ( Any , Any ) and we have lost type safety, if instead we define the types explicitly , it picks it up

scala> val printfn1 = printpairTypes[Int, String] _
printfn1: (Int, String) => Unit = $$Lambda$1028/0x0000000801128840@426ba1d5

varargs or variable arguments can be passed to a method ( not supported in functions ) by specifying * next to the type ..see example below


scala> def concatStrings(strings:String*) = {
     | var concatanetedstr = ""
     | for (s <- strings) concatanetedstr = concatanetedstr + "" + s
     | concatanetedstr
     | }
concatStrings: (strings: String*)String


scala> concatStrings("Hello" , "World" , "From" , "Ella")
res23: String = HelloWorldFromElla

( i am trying a new wordpress plugin for syntax highlighter midway through the post , so the code section can be a bit more readable …whats life without constant change )

concatStrings("Hello" , "World" )
res24: String = HelloWorld

so the same method is invoked with different number of parameters or variable arguement or vararg

procedures are named reusable statements , functions are named reusable expressions and since statements dont return anything , procedure returns unit which is equivalent to void in java

we can define methods with or without parantheses …its allowed and makes it look like fields

nested functions are allowed , we can return a tuple of values from the outer function

Higher order function can take a function as an argument or return a function. you can pass parameters and the function that needs to be invoked as another parameter to another function in this case a higher order function . similarly if the last statement in a function is an anonymous function then that become the return value so you return a function. This helps with currying. i have described a lot in the last two statements. This needs a separate post by itself.

here is a sample higher order function

def math(x:Double,y:Double,fn:(Double,Double)=>Double):Double = {fn(x,y)}
math: (x: Double, y: Double, fn: (Double, Double) => Double)Double

scala> val result  = math(10,20,(a,b)=>a + b)
result: Double = 30.0

// in the case above we are passing the function ( or rather the function definition ) // to the math function as is the case below 


scala> val f2 = (a:Double,b:Double) => a + b
f2: (Double, Double) => Double = $$Lambda$907/0x00000008010a0840@6f4adaab

scala> val result  = math(10,20,f2)
result: Double = 30.0

// in this case math is a higher order function since it accepts a function f2 as a parameter

partially applied function is way to fix or bound one or many parameters of an existing function ,this promotes code reuse . in Java we would use overloading to achieve the same functionality .

you can group similar parameters into parameter groups , so you can have functions that take in multiple parameter groups fn()() …this idea of writing a function with multiple parameter groups is called Currying

you can specify any of the parameter groups and you get partially applied functions ..

closure = nested function + all of the variables local to the outer scope ( Referencing environment )

reading json in spark

reading a typical json file in spark may fail . it will say something like corrupt records ….spark looks for a corrupt record column in the schema where it can dump jsons that are not correctly parsed , if it doesnt find the same , it will fail .

here is one way to handle this problem

https://docs.azuredatabricks.net/_static/notebooks/read-csv-corrupt-record.html

the other option is to convert the json such that each line is a json object , the json object is not sperated by “,” and the parser can expect a complete json in each line

this behavior is because in a system like Hive the json objects are stored as values of a single column.

see code below

#File location and type

file_location = “/FileStore/tables/trythisjsonfmtd.json”
file_type = “json”

#CSV options

infer_schema = “true”
first_row_is_header = “false”
delimiter = “,”

#The applied options are for CSV files. For other file types, these will be ignored.

df = spark.read.format(file_type) \
.option(“inferSchema”, infer_schema) \
.option(“header”, first_row_is_header) \
.option(“sep”, delimiter) \
.load(file_location)

display(df)

linked service to Azure SQL

lets start with the basic steps to assign permissions for Azure Data Factory to have access to Azure SQL.

once you have the azure SQL server/ Database created you need to

  1. Create a AD user that has been given admin permission to the Azure SQL server
Ad users created in AAD

2. the next step is to make this user as the admin on the SQL server – use the set admin

3. log into SQL Server management studio using the AD user credentials . The reason you need to do step 1 and 2 , is that you cannot grant access to a Azure AD account from a SQL log in . you will get this error below

Only connections established with Active Directory accounts can create other Active Directory users.

so once you log into SQL management studio with the AD account , you can create a user account for the data factory and assign a role

CREATE USER testdfsjvz FROM EXTERNAL PROVIDER;

alter role db_writer ADD MEMBER [testdfsjvz]

4. Create a linked service in Azure Data Factory and test connection

and there you have it , you just created an linked service to Azure SQL

Replication for storage account in Azure

When you create a storage account in Azure , you have the option of selecting different kinds of replication strategy to ensure availability. The options presented are fairly straightforward to understand

You have all these options , the locally redundant storage would be the cheapest , but it doesn’t give you a failover option. Use this for your least important applications or environments.

azure blob

Blobs are binary large object , traditionally we had to either store them either inside a filesystem or inside a database , but with REST protocol , we now have a new way to access bits that are encapsulated by some metadata. All we need is a http server to answer to the rest api calls and you can get access to your group of bits or BLOB ….so thats it Azure BLOB is microsofts offering of Object storage in the cloud . When you add a layer of filesystem on top of it , you get Azure Data Lake storage …because its a filesystem …you now can have hierarchies or folders with sub folders with some more sub folders and so on and so forth …..the Blob storage is flat names space …you define your storage account name , your container and then pour your blobs into that container .

as you can see from the picture above once you are in the container , you don’t have access to create a second container , all you have is access to upload a file into this container. So if you want to store your objects and organize it neatly and manage it , then ADLS is your answer.

With Blob or object storage , you are not limited by the filesystem limitations – like the inode table etc …you just specify the exact identifier and it gets it back to you …. much easier so a lot more scalable …with ADLS you get the best of both worlds …the ABFS driver makes the rest call to the underlying blob storage and fetches the data and get it you

you cannot access the blob objects that are in data lake through a REST api call to underlying blob layer and access it using the filesystem …this is just way too risky ….this is not allowed.

( this is my understanding , its over simplified …feel free to add more clarity )

Azure cost management

one of the good things about azure cost management is that it gives reports whats costing you all that money ….i still remember 10 years ago , where we had built a CMDB and using that for cost management and it became an expensive task just to get insight on the costs for the on- prem infrastructure ….in Azure it takes a few clicks and you have the cost analysis ready

Here is a report from one of my subscription

i had spent the bulk of money in running an app service which i never really used

this report quickly helped me narrow down what the costs were, where it was running etc .

i quickly deleted the resource and i was good ….

Steps to publish an ADF pipeline

  1. Make changes in the development branch
  2. Submit a PR request
  3. Inspect the code changes
  4. Approve the PR request

This will merge the Pull request  – which is essentially taking the dev branch and merging it into the master branch

Machine generated alternative text:
added wait 
Overview 
O 
3e2b4ae6-838d-4071-3076-Se6010b91SS0 e3047cd9-ea0e-4bff-bbff-dcc87fd2e6bb development into master 
Files Updates Commits 
Merging pull request 
No merge conflicts 
Last checked 2m ago 
Description 
Updating pipeline: Copy from REST connector into ADLS Gen2

This should sync up the master branch and the development branch

If you try to directly publish from the development branch ,  it throws mud at you  – it says publish is only allowed from collaboration.

Machine generated alternative text:
Microsoft Azure Data Factory testdfsjvz 
merit •v 
/ C_) Refresh 
Factory Resources 
Pipelines 
Copy from REST connector i.. 
pipelinespec± 
pipelinetocc.pytc.avrc 
Datasets 
Data flows 
Templates 
Copy from REST con... 
Activities 
Move & transform 
Azure Data Explorer 
Azure Function 
Batch Service 
Databricks 
Data Lake Analytics 
General 
HDlnsight 
Iteration & conditionals 
Machine Learning 
Seve as template 
Valid-te 
> Debug 
•cid trigger 
Cac•y "t: 
CopyD?ts 
GetSearerToken 
Publish is only allowed from collaboration ('master') 
branch. Merge the changes to 'master'.

5. Switch to master branch and then hit publish

This is actually deploying the ADF pipeline to the service , so  it automatically internally kicks of a validation and will prompt you fix the validation errors.  If you have already validated the data factory job then you are good to go and this will publish the flow.

The publish branch is the branch where all of the arm templates are stored. The other branches dont have these and you may get a prompt indicating the same .

Pull request in ADF

In Azure Data Factory , we are presented with four choices while we complete a pull request



The Squash commit is the cleanest option , since it gives one commit on the target and the individual commits in the development/ feature branch is ignored. If we need to roll back , we know exactly which commit to go back to. I am sure there will be a difference of opinion in this matter , would love to hear your views on this.

uptime – wait how many 9’s do you need again?

i found this as the most cleanest way to explain uptime . This is usually the starting point of designing solutions . What kind of downtime can the business handle , we can then match the appropriate level of design with the required uptime

System uptime can be expressed as three nines, four nines, or five nines. These expressions indicate system uptimes of 99.9 percent, 99.99 percent, or 99.999 percent. To calculate system uptime in terms of hours, multiply these percentages by the number of hours in a year (8,760).

Uptime levelUptime hours per yearDowntime hours per year
99.9%8,751.24(8,760 – 8,751.24) = 8.76
99.99%8,759.12(8,760 – 8,759.12) = 0.88
99.999%8,759.91(8,760 – 8,759.91) = 0.09
https://docs.microsoft.com/en-us/learn/modules/evolving-world-of-data/3-systems-on-premise-vs-cloud

mount adls in DataBricks with SPN and oauth2

Here is the overall flow to mount the adls store in DataBricks using Oauth

steps to mount data lake file system in azure data bricks

1st step is to register an app in azure directory

this creates the application (client id) and the directory ( tenant ) id.

within Azure Ad app registration -> create a client secret -> once generated you have to copy the key value

once its hidden , it stays hidden forever – hence very important to rememeber to store the secret.

this secret key gets exchanged for a token at the time when we are trying to mount the file system

  • next step – store key in key vault

open up key vault -> click on generate /import -> paste in the secret generated in the previous step

  • once this step is done , go to data bricks
  • why there is no direct link to create a scope is beyond me , but there are two options – web method or databricks cli , i will use the web method to create the scope , will cover the databricks cli later – its my preferred approach but i have not

first step – go to key vault and get the dns name and resource id

once you get this – go to the web page as shown in step 6 below

and copy the corresponding DNS name and resource id

in this case we have created a scope called dbtravelscope

At this point we have created a scope with the client secret stored. We should be able to proceed with the steps outlined in this link below to get the adls mounted on Data Bricks

https://docs.databricks.com/data/data-sources/azure/azure-datalake-gen2.html#mount-an-azure-data-lake-storage-gen2-account-using-a-service-principal-and-oauth-20&language-scala

parquet vs avro

Ever wondered if parquet or avro is a better choice for your data lake file system , well let the numbers speak for themselves .

Here is a simple job in azure data factory that picks up a table in MySQL and loads it into data lake .

This image has an empty alt attribute; its file name is avrostat.jpg

We are loading 34.86 MB of data from the source system . This can be seen in the left under the MySQL section and on the right we have the data written which says 17.551MB , so AVRO format compressed the size by 2X . The copy task lasted about 2 minutes 39 seconds

Enter Parquet – tada !

Lets run the same job and write it to parquet file and see what the numbers look like

This image has an empty alt attribute; its file name is parquetstat.jpg

its the same file loaded into the same Data Lake and lets see the numbers this time . The Data written on the right side is 2.57 MB , thats 17X compression !!! . Notice the throughput and the copy duration numbers , those look so much better. …

Parquet is clearly the winner in this scenario , Avro is a row based format , stores the contents in binary with the meta data in JSON , however is columnar ( with row groups ) with summarization stats added to the metadata . The columnar lends itself it to much better compression and hence we see the performance gains above. enjoy !