您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15079|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;+ S! ~3 Y2 R" m5 c
  2. require_once __DIR__ . '/Workerman/Autoloader.php';( L* T3 L' B% |' Y0 C. I5 a
  3. ' {+ t: @# t7 a3 N
  4. $worker = new Worker();
    6 P! A* x8 E3 v! R
  5. // 4个进程
    / M8 ~! B2 ^# y2 y; ^. w4 q
  6. $worker->count = 4;! B7 m$ g% @$ z( B1 P2 m  p- [4 J
  7. // 每个进程启动后在当前进程新增一个Worker监听7 B% R- m$ J$ K- m, Z: u) Y
  8. $worker->onWorkerStart = function($worker)
    . j7 M( b+ p: C6 w
  9. {
    8 G, n# Y4 Z0 d
  10.     /**; M9 Y+ N& `! C7 g" J
  11.      * 4个进程启动的时候都创建2016端口的Worker
    , U3 c6 k$ g2 n
  12.      * 当执行到worker->listen()时会报Address already in use错误7 I3 [! t3 U  ^! e2 e- q9 r/ T
  13.      * 如果worker->count=1则不会报错
    - N3 t" [8 H! o1 H
  14.      */, C2 N7 C9 F; P9 S1 d: v4 T
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');% g+ X5 W: S( [5 [
  16.     $inner_worker->onMessage = 'on_message';
    ) Q( H. P9 J4 ]% q) r- O
  17.     // 执行监听。这里会报Address already in use错误
    / K# z. Q! T5 a- V
  18.     $inner_worker->listen();
    + L+ t; f2 w. X2 k; o& v& H6 H8 }
  19. };
    # a9 _# R9 i" W+ }) J, i% ~
  20. : p3 I( R( e: e" Z9 M- X
  21. $worker->onMessage = 'on_message';
    ) b* i3 x* R+ l+ T1 D' q- j
  22. + m' x4 d: J( a4 }, B, H% N5 ^
  23. function on_message($connection, $data)
    $ k. Y, e6 @+ y* v0 `2 h
  24. {
    ! N/ v& d' {2 l( s- Y: \( ~. @+ x
  25.     $connection->send("hello\n");8 ]3 ~9 B- Z5 h7 z4 q# C
  26. }! P1 ?/ r) a# m0 K1 y' _6 ^' Z
  27. 8 a2 l/ V, q0 M8 v+ A4 M. ]
  28. // 运行worker# G* P! V, n7 @2 I! n8 k( ~
  29. Worker::runAll();
      H0 q! s6 p# G  c
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    4 M9 e% p1 D& r

  31.   e4 m& Z6 u! {' D% d/ Q; h. I: e
  32. use Workerman\Worker;8 K* u% b) K& Q% c2 j# D8 o2 M
  33. require_once './Workerman/Autoloader.php';
    9 \0 P$ P7 ~1 M" b

  34. # i; W+ o6 [5 A( m# S+ k4 t: o
  35. $worker = new Worker('text://0.0.0.0:2015');
    9 W* f: O, {1 T4 X- n" P9 u
  36. // 4个进程$ \5 e6 d. K5 |0 o$ j- a* _; V
  37. $worker->count = 4;
    $ A' v4 P: D6 d/ d  i' ?
  38. // 每个进程启动后在当前进程新增一个Worker监听
    : `' `- r" ~. o! n+ A) q/ ^( ~- q
  39. $worker->onWorkerStart = function($worker)3 A/ E# `# L: D
  40. {- e$ u$ j9 j. V4 X8 q7 s
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');* B- e" K  {. ~; m% l* Q$ B# X! {
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)) |! T0 z. C; ?2 O! Q6 ^
  43.     $inner_worker->reusePort = true;
    # g% p+ O- u6 z- ~- t- m
  44.     $inner_worker->onMessage = 'on_message';
    4 H# L! e7 k- M$ k# v" n4 ~: J5 c3 w
  45.     // 执行监听。正常监听不会报错- y5 ~% S% H1 C/ D3 o+ d6 t& `, u
  46.     $inner_worker->listen();7 Y$ ^& d9 W6 G# t( A/ Q7 ]9 E
  47. };5 b/ G/ o8 _! X: X/ Y

  48. 6 f3 p$ X& m5 u& X
  49. $worker->onMessage = 'on_message';
    5 U# d4 [, P$ j, W: R

  50. % `* _6 I$ T! c6 a- C# M' W( v
  51. function on_message($connection, $data), b- p8 [1 r$ n: t8 j
  52. {
    . D# T/ x0 `% V0 r/ w0 X+ j' h. L
  53.     $connection->send("hello\n");
    2 [9 w: y, T' r, P1 t5 j: g
  54. }
    ) r" _+ ~+ u' q( ?

  55. . D4 Q) `  {0 [; n0 h
  56. // 运行worker
    ' W  F& b& M$ e) d! c
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    0 J, x  w8 S* q
  2. use Workerman\Worker;
    ( {, m1 G* _( y
  3. require_once './Workerman/Autoloader.php';
    5 W+ d- j) i' {$ g, x1 l# G2 H6 m
  4. // 初始化一个worker容器,监听1234端口
    9 |1 J6 P# f8 L: [2 E' H
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    8 E) Y9 W, H: o- Y" _# t

  6. : n+ d1 p) e# b  G% O# T
  7. /*
    4 O, e2 x  ~' m, H1 J! Q* t
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误& Q1 K, e  y9 V& u! L- l
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    8 s/ P. d  d& S' B
  10. */. [  N* D: Y6 @
  11. $worker->count = 1;: D! F( n/ ^/ m0 i4 j2 q
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口& m% K: w1 o) w* C  e3 T( |! C
  13. $worker->onWorkerStart = function($worker)% N0 j1 ]# c6 k; X5 q6 a$ l# _
  14. {
    5 y0 s5 ~& \9 H( K
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符* L' _+ {- a1 p6 W! [
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');# c. S0 m! o  n6 D! J: Q1 H6 z
  17.     $inner_text_worker->onMessage = function($connection, $buffer)- l0 g8 K) W0 A" A! j
  18.     {5 `. Y8 r8 O( ^& C+ h; a
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据" |% @/ S1 e7 S& d- @7 W# m
  20.         $data = json_decode($buffer, true);  f# ?2 q5 o; U& @; m
  21.         $uid = $data['uid'];
    * `# o5 r5 Z4 f
  22.         // 通过workerman,向uid的页面推送数据
    5 {" E) s5 o7 R1 ?  P( S
  23.         $ret = sendMessageByUid($uid, $buffer);9 t) e' |9 o' R' ]1 R7 e4 t& n' A
  24.         // 返回推送结果
    5 U, A- j/ ~% {3 R2 z- e
  25.         $connection->send($ret ? 'ok' : 'fail');' S( m% P& Z+ a- m+ X6 o& V
  26.     };
    9 j" x, ~" M9 B9 X" K
  27.     // ## 执行监听 ##( }2 C" C- j" H/ n. `6 y. w/ T
  28.     $inner_text_worker->listen();
    9 W  z# ?6 c' Y" H, p
  29. };
    , A$ g' ]/ Z- r
  30. // 新增加一个属性,用来保存uid到connection的映射0 g/ X/ b8 h0 _. }5 O. S# |- }' n
  31. $worker->uidConnections = array();
    * K6 g; O  H" k. W( X: k  D
  32. // 当有客户端发来消息时执行的回调函数6 Z% Y7 q- L& N, u$ e% U
  33. $worker->onMessage = function($connection, $data)
    . l9 w  g+ F/ O# ~, Q( v. W. ]: U
  34. {% L, q: J3 h5 D. @
  35.     global $worker;: a% o: b( x' d+ `
  36.     // 判断当前客户端是否已经验证,既是否设置了uid. b1 i3 ?; m/ y
  37.     if(!isset($connection->uid)); Y: {" c4 K, h( n& i9 |- t
  38.     {  q0 g2 I, ~) z- r6 a% |
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    9 ^- Y! {4 q/ K% p! ^/ V
  40.        $connection->uid = $data;
      H) B: D! f1 D* |1 H; v- T
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,9 `/ Y1 B7 h4 [+ ^: y
  42.         * 实现针对特定uid推送数据( O4 G( K& z9 I  y; O( ^0 e: w
  43.         */
    , @+ p3 ]( I' P
  44.        $worker->uidConnections[$connection->uid] = $connection;
    8 I6 D5 z0 O7 [( y
  45.        return;6 H: X, g/ l6 s; ^3 p3 Q
  46.     }
    ; L3 P) G, s1 W) K4 j% y
  47. };
    % a4 F) O+ g: l9 ]8 K5 i

  48. $ H5 a7 v/ W3 s7 p9 S4 s
  49. // 当有客户端连接断开时. h0 ^( n: _6 v4 V5 D4 o( Q
  50. $worker->onClose = function($connection)
    ) V, }  w* w( M0 n6 m
  51. {1 B# R9 J  O! w. @) u
  52.     global $worker;
    : }' T; x. x8 b4 v4 E8 i
  53.     if(isset($connection->uid))/ Q/ C8 l' a+ B& ?* E, v
  54.     {( i) ^* j8 I  I0 y/ v/ y$ f
  55.         // 连接断开时删除映射
    ' T7 E* [% }* s$ J" ~0 X0 H( [
  56.         unset($worker->uidConnections[$connection->uid]);$ O* |& N. |- Q: \4 F1 G6 x  V
  57.     }- j1 L  b/ N) e  P6 Q
  58. };
    5 b3 }& e- ]9 U$ T

  59. " ^( @8 z: t) i* X
  60. // 向所有验证的用户推送数据
    6 Z. \! q) D: S/ E  {. g2 Q$ V
  61. function broadcast($message)* F/ R5 ?# E/ x9 n0 F
  62. {3 g3 {6 i! g+ J: i
  63.    global $worker;6 U3 _& _8 d3 _6 P! j! u) H, J
  64.    foreach($worker->uidConnections as $connection)8 P5 T6 _3 h: W8 b0 ?. ?
  65.    {
      j5 H+ |& q+ r7 s
  66.         $connection->send($message);
    4 {4 l  n7 A+ |  @5 g' G# n
  67.    }
    5 y3 S$ `: \( z4 [5 g! }
  68. }
    0 T1 I1 W/ T6 ^8 n
  69. & |) W3 q1 J4 N2 N& ~% A
  70. // 针对uid推送数据0 I0 }0 i6 ]9 `" u/ U
  71. function sendMessageByUid($uid, $message)- j' t! h# z$ j6 A! Z
  72. {
    ) @# D! `: q* {6 f" @0 H
  73.     global $worker;, ^) O) [) m& c; F- p2 ?/ V
  74.     if(isset($worker->uidConnections[$uid]))
    * G0 m) d# @( p* G' f9 E* q4 [
  75.     {
    3 y& ^# O% k) g0 ~  n/ K
  76.         $connection = $worker->uidConnections[$uid];
    4 X6 }( e7 Y1 l& ^  V  N
  77.         $connection->send($message);
    + ?% }) Z, a9 F! ~) U' t
  78.         return true;
    : k4 B2 w4 t3 z7 N- L; }
  79.     }3 N8 W  T- [* d3 g
  80.     return false;$ ~" @  E: ?; W7 y% w0 I
  81. }
    / D" V7 {6 d$ ?: O

  82. 1 p, R( w; b: `9 f9 \/ v
  83. // 运行所有的worker
    & R2 [, B+ `) y' P1 k
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');& b$ ^. w# }2 x  B1 D% l! {
  2. ws.onopen = function(){
    8 V: v1 H+ l2 b. [0 w, d
  3.     var uid = 'uid1';1 H9 O/ i) U. x1 {# ~
  4.     ws.send(uid);
    7 a2 L; s  v0 O6 |* R  x
  5. };& M( P7 N$ F) F- t5 G% y
  6. ws.onmessage = function(e){
    / I* @. {( c4 Z
  7.     alert(e.data);- d8 m, [1 x5 }! k! x
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口( w7 a  m2 @9 J7 F+ E- i% O
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ( ~/ M: w$ K( }+ u9 U( Y
  3. // 推送的数据,包含uid字段,表示是给这个uid推送% R$ T5 m- v+ G+ R. Z
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    % t% h+ Y) V* D; m* W4 r2 w
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    - e" G0 H& W& n, W: B
  6. fwrite($client, json_encode($data)."\n");6 E' N# I" S* g( d4 X0 l# I) h
  7. // 读取推送结果2 j! {, {2 ?5 l1 L* k) }# ^
  8. echo fread($client, 8192);
复制代码

5 w/ h! d  U9 \/ R8 R' o
* e$ ?: H3 I7 o
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-5-2 18:26 , Processed in 0.074343 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!