from twitter to kafka

Here is a sample java program to listen to some terms in twitter and feed it to kafka , in this case we are listening to anything that includes kafka

package com.github.sjvz.tutorial2;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;


public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

List<String> terms = Lists.newArrayList("kafka");


public TwitterProducer() {}

public static void main(String[] args) {
new TwitterProducer().run();


}
public void run() {


/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
 BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);


// create a twitter client
// Attempts to establish a connection.
Client client = createTwitterClient(msgQueue);
client.connect();
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();


// add a shutdown hook

Runtime.getRuntime().addShutdownHook(new Thread (() -> {
logger.info("stopping application");
logger.info("shutting down client from twitter");
client.stop();
logger.info("closing producer");
producer.close();
logger.info("done!");
}));

// loop to send tweets to kafka

while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if ( msg != null) {
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if( e != null){
logger.error("something bad happened", e);
}
}
});
}
logger.info("End of application");
}


}

String consumerKey = "your key"
String consumerSecret = "your consumer secret" ;
String token = "your access token " ;
String secret = "your access secret";


public Client createTwitterClient(BlockingQueue<String> msgQueue) {


/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
 Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
// List<Long> followings = Lists.newArrayList(1234L, 566788L); // this is to follow people

// hosebirdEndpoint.followings(followings); // for people
hosebirdEndpoint.trackTerms(terms);

// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey,consumerSecret, token, secret);

ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));


Client hosebirdClient = builder.build();
return hosebirdClient;

}

public KafkaProducer<String,String> createKafkaProducer() {
String bootstrapservers = "192.168.1.105:9092";
// create producer properties

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());



// create producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

return producer;
}
}