kafka-php - 盘古开地 - 博客园

kafka-php

kafka-php的github地址  https://github.com/jacky5059/kafka-php

生产者produce示例代码


<?php set\_include\_path( implode(PATH_SEPARATOR, array( realpath(\_\_DIR\_\_ . '/../lib'),
        get\_include\_path(), ))
); require 'autoloader.php'; $host = 'localhost'; $port = 9092; $topic = 'test'; $producer = new Kafka_Producer($host, $port, Kafka_Encoder::COMPRESSION_NONE); $in = fopen('php://stdin', 'r'); while (true) { echo "\\nEnter comma separated messages:\\n"; $messages = explode(',', fgets($in)); foreach (array_keys($messages) as $k) { //$messages\[$k\] = trim($messages\[$k\]);
 } $bytes = $producer->send($messages, $topic); printf("\\nSuccessfully sent %d messages (%d bytes)\\n\\n", count($messages), $bytes);
}

简单消费者simple consumer示例代码


<?php set\_include\_path( implode(PATH_SEPARATOR, array( realpath(\_\_DIR\_\_ . '/../lib'),
        get\_include\_path(), ))
); require 'autoloader.php'; $host = 'localhost'; $zkPort  = 2181; //zookeeper
$kPort   = 9092; //kafka server
$topic   = 'test'; $maxSize = 10000000; $socketTimeout = 2; $offset    = 0; $partition = 0; $nMessages = 0; $consumer = new Kafka_SimpleConsumer($host, $kPort, $socketTimeout, $maxSize); while (true) { try { //create a fetch request for topic "test", partition 0, current offset and fetch size of 1MB
        $fetchRequest = new Kafka_FetchRequest($topic, $partition, $offset, $maxSize); //get the message set from the consumer and print them out
        $partialOffset = 0; $messages = $consumer->fetch($fetchRequest); foreach ($messages as $msg) { ++$nMessages; echo "\\nconsumed\[$offset\]\[$partialOffset\]\[msg #{$nMessages}\]: " . $msg->payload(); $partialOffset = $messages->validBytes();
        } //advance the offset after consuming each message
        $offset += $messages->validBytes(); //echo "\\n---\[Advancing offset to $offset\]------(".date('H:i:s').")";
        unset($fetchRequest); //sleep(2);
    } catch (Exception $e) { // probably consumed all items in the queue.
        echo "\\nERROR: " . get_class($e) . ': ' . $e->getMessage()."\\n".$e->getTraceAsString()."\\n"; sleep(2);
    }
}

基于zookeeper的消费者zkconsumer示例代码


<?php set\_include\_path( implode(PATH_SEPARATOR, array( realpath(\_\_DIR\_\_ . '/../lib'),
        get\_include\_path(), ))
); require 'autoloader.php'; // zookeeper address (one or more, separated by commas)
$zkaddress = 'localhost:8121'; // kafka topic to consume from
$topic = 'testtopic'; // kafka consumer group
$group = 'testgroup'; // socket buffer size: must be greater than the largest message in the queue
$socketBufferSize = 10485760; //10 MB
// approximate max number of bytes to get in a batch
$maxBatchSize = 20971520; //20 MB
$zookeeper = new Zookeeper($zkaddress); $zkconsumer = new Kafka_ZookeeperConsumer( new Kafka\_Registry\_Topic($zookeeper),
    new Kafka\_Registry\_Broker($zookeeper),
    new Kafka\_Registry\_Offset($zookeeper, $group),
    $topic,
    $socketBufferSize ); $messages = array(); try { foreach ($zkconsumer as $message) { // either process each message one by one, or collect them and process them in batches
        $messages\[\] = $message; if ($zkconsumer->getReadBytes() >= $maxBatchSize) { break;
        }
    }
} catch (Kafka\_Exception\_OffsetOutOfRange $exception) { // if we haven't received any messages, resync the offsets for the next time, then bomb out
    if ($zkconsumer->getReadBytes() == 0) { $zkconsumer->resyncOffsets(); die($exception->getMessage());
    } // if we did receive some messages before the exception, carry on.
} catch (Kafka\_Exception\_Socket_Connection $exception) { // deal with it below
} catch (Kafka_Exception $exception) { // deal with it below
} if (null !== $exception) { // if we haven't received any messages, bomb out
    if ($zkconsumer->getReadBytes() == 0) { die($exception->getMessage());
    } // otherwise log the error, commit the offsets for the messages read so far and return the data
} // process the data in batches, wait for ACK
$success = doSomethingWithTheMessages($messages); // Once the data is processed successfully, commit the byte offsets.
if ($success) { $zkconsumer->commitOffsets();
} // get an approximate figure on the size of the queue
try { echo "\\nRemaining bytes in queue: " . $consumer->getRemainingSize();
} catch (Kafka\_Exception\_Socket_Connection $exception) { die($exception->getMessage());
} catch (Kafka_Exception $exception) { die($exception->getMessage());
}

Original url: Access
Created at: 2018-10-29 13:04:43
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论