Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push mode下实现每隔一段时间push #46

Open
xiayank opened this issue Jun 13, 2017 · 2 comments
Open

Push mode下实现每隔一段时间push #46

xiayank opened this issue Jun 13, 2017 · 2 comments

Comments

@xiayank
Copy link

xiayank commented Jun 13, 2017

我现在的问题是在push mode下,我用rabbitMQ的handleDelivery这个callback function来监听我的reduced price queue.但是问题是,这个function一直在执行,它一直在监听queue,我要做用ScheduledExecutorService到每隔一段时间来执行一下,没法先让它停止.
我查了也没找到当MQ为空时,停止consumer的方法。既然没法停止,我就想还是让consumer一直监听,但是每隔一段时间push一下从queue里得到的东西,但是ScheduledExecutorService 重写run()方法,这个方法是没有参数的,没办法把从queue里得到的东西传进去。
大家有什么好方法吗?谢谢!

@jygan
Copy link
Contributor

jygan commented Jun 14, 2017

public class MyRunnableTask implements Runnable {
    public MyRunnableTask(Queue q) {
        //do stuff here
    }
     public void run() {
         // do stuff here
     }
}
scheduleAtFixedRate (new MyRunnableTask(1), long initialDelay, long period, TimeUnit timeunit);

@xiayank
Copy link
Author

xiayank commented Jun 14, 2017

@jygan
老师这个是我的代码,我是想实现callback function一直给list加收到的product,然后30s 发送一次product list,然后把ArrayList清空,但是从我跑的情况来看,只有callback function再一直add product进list,但是重写的run()方法没有像我设置的那样每隔三十秒发送一次list,只是最开始执行了一次。
但是我用ScheduledExecutorService写的crawler就可以实现excture at fix rate,不知道哪里有问题。
这是我的repo: https://github.com/xiayank/Price-Monitor-Push-Server.
主要代码:
Main():

final MySQLAccess sqlAccess = new MySQLAccess(mysql_host, mysql_user, mysql_psw,mysql_db);

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection connection = null;

		try {
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			channel.queueDeclare("LevelOne",true,false,false,null);

			final ArrayList<Product>reducedList = new ArrayList<>();

			ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

			TestRunnable testRunnable = new TestRunnable(reducedList, sqlAccess);

			scheduledThreadPool.scheduleAtFixedRate(testRunnable,1,60, TimeUnit.SECONDS);

			Consumer consumer = new DefaultConsumer(channel){
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope,
										   AMQP.BasicProperties properties, byte[] body)
						throws IOException {

					Product product = (Product) SerializationUtils.deserialize(body);
					System.out.println(product.productId);
					reducedList.add(product);

				}
			};

			channel.basicConsume("LevelOne", true, consumer);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}

TestRunnable.class

public class TestRunnable implements Runnable {
    private static final long serialVersionUID = 1L;
    static final String mysql_host = "127.0.0.1:3306";
    static final String mysql_db = "project";
    static final String mysql_user = "root";
    static final String mysql_psw = "1127";
    ArrayList<Product> reducedList = null;
    MySQLAccess sqlAccess = null;

    public TestRunnable(ArrayList<Product> reducedList, MySQLAccess sqlAccess){
       this.reducedList = reducedList;
       this.sqlAccess = sqlAccess;
    }

    @Override
    public void run() {
        System.out.println("60 s run again!!!!");
        for(Product product : reducedList){
          //send product list
             }
        reducedList.clear();
    }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants