您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15076|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    / P* s5 x# K# g: V7 B8 ?
  2. require_once __DIR__ . '/Workerman/Autoloader.php';! t% P5 G1 d6 F/ ^. B/ n; n$ ~

  3. " T! p; s) @7 L) v" F# m0 g
  4. $worker = new Worker();2 P$ m4 Z$ d. s- i, J' y) [& u
  5. // 4个进程
    : S$ r. @9 ]1 V
  6. $worker->count = 4;
    0 p( D3 b6 G. F6 P
  7. // 每个进程启动后在当前进程新增一个Worker监听1 k) `3 W7 w& [
  8. $worker->onWorkerStart = function($worker)
      B& g& I* L: R; ^
  9. {
    + g3 J5 }6 E0 [( F' J1 k, `
  10.     /**
    , x( K$ F, i' L- u9 R: K( e
  11.      * 4个进程启动的时候都创建2016端口的Worker) i4 U1 X; U8 ?% P. k6 d
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ; Z: r2 \' v( [- q( C
  13.      * 如果worker->count=1则不会报错. ^$ M, Q# L2 G7 J
  14.      */( |# u6 [* b, f5 y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');: R: C$ l1 e* r8 f* u5 r
  16.     $inner_worker->onMessage = 'on_message';3 O) K9 Q3 _* H) u; l" |5 ?
  17.     // 执行监听。这里会报Address already in use错误+ ^* \) q" v: b4 v
  18.     $inner_worker->listen();/ r/ p, Z5 ~. a) @) I
  19. };
    4 V6 r& c4 I4 d' N

  20. 3 p& G( ~& _) P7 R6 s; m% s
  21. $worker->onMessage = 'on_message';! f0 b" I8 E  H6 W0 x3 p* A! c
  22. 4 Z0 d" _8 N! V# J1 B( _$ J. q
  23. function on_message($connection, $data)
    9 E+ z# r2 x* t. d
  24. {% ?4 {0 P; Z. G4 J  {( F
  25.     $connection->send("hello\n");
    ; N9 n- d; b1 L0 o; y/ F
  26. }
    + ~) ]* W+ ~. N

  27. & h& J. s. a& Z  P: y# z
  28. // 运行worker
    & M7 G# m! N# l: a9 F9 N
  29. Worker::runAll();
    1 b( Y4 J( t; U
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:6 Z; s& ^) ~; N6 n, U
  31. / Q# u+ Z% _* Y4 y; \7 k6 v
  32. use Workerman\Worker;
    % |" S7 ]5 J( y! D0 e
  33. require_once './Workerman/Autoloader.php';2 Z8 m) b% ~9 A# l# Y
  34. + O9 z# p" J0 a" @6 Q5 S$ I5 S4 d
  35. $worker = new Worker('text://0.0.0.0:2015');9 f. A3 ~5 @8 E; M4 E5 W# ~' [, f
  36. // 4个进程
    8 z) W* a: J  a1 G, @+ Z( ?9 v
  37. $worker->count = 4;
    3 g: ^, [( K8 Y" u
  38. // 每个进程启动后在当前进程新增一个Worker监听5 ]+ U8 V4 Q7 V( t. v& e9 k* s4 E
  39. $worker->onWorkerStart = function($worker)& H8 T8 G9 g" V4 C) f
  40. {7 b1 a7 S7 M# r
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');* r% q; O) w' J: J$ e! h# O$ D3 O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    # e5 K2 C3 @9 p% J
  43.     $inner_worker->reusePort = true;% e: g! }$ ]4 m3 L& E$ C  q
  44.     $inner_worker->onMessage = 'on_message';6 S. T+ r# y8 ~1 h/ j) u  J
  45.     // 执行监听。正常监听不会报错% q: X! [) d  B" Q5 O4 r# A
  46.     $inner_worker->listen();6 [+ Y: r* Q6 h9 [+ o
  47. };
    - X( A) l& E. K6 l7 `
  48. $ |  F# r, y+ T. M7 q
  49. $worker->onMessage = 'on_message';
    2 R( d, T- v, s+ k
  50. 5 d# Z; ?+ F+ |. x! @: Y: h0 y
  51. function on_message($connection, $data)
    4 d# |$ O# v& E5 D- f
  52. {
    1 n8 q! e3 d$ T, D1 o, M
  53.     $connection->send("hello\n");
    7 r9 j0 E* V6 N2 e% s
  54. }4 O! O, t7 S& O3 f
  55. . G' ^0 @  L0 e
  56. // 运行worker* s" W2 n% g& k, M( j
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    ! k3 M3 N- Z4 i% s
  2. use Workerman\Worker;
    ' u) t* G3 _7 r5 A7 v4 }  `* i/ h
  3. require_once './Workerman/Autoloader.php';( q7 h2 H  R5 D9 |
  4. // 初始化一个worker容器,监听1234端口5 p. S* o# P5 c" B4 F# q1 I
  5. $worker = new Worker('websocket://0.0.0.0:1234');! q+ u( V3 V8 e
  6. . [5 d3 m) q* h8 U8 [+ j
  7. /*
    # G- [- T$ @4 x2 S
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误$ ?: e0 s+ N3 Y. A' O
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    9 |" F" ]: L0 c5 q$ L
  10. */9 k" T" [8 v* U! v6 i
  11. $worker->count = 1;, `- |6 b# L0 z5 F" s1 j* O( B
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    5 Q6 E9 i$ {/ F, ^/ Q) n
  13. $worker->onWorkerStart = function($worker)% Y, t( w/ Z6 o  r8 I  ?4 S
  14. {6 @7 v4 x+ W2 ~( c9 R1 J7 p4 C
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    & S! g  _0 @7 z: y% y
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    1 b6 p( q' J9 h: v2 Y- t! W
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ' F5 \9 x# M' R; V  b3 \
  18.     {
    7 w1 J# _! e" n9 O. J+ x
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    + K* J8 g- j8 x3 l) i
  20.         $data = json_decode($buffer, true);
    $ Y# Q) [7 a1 `3 S, }
  21.         $uid = $data['uid'];
    ! o( T7 l3 R& t/ y  X
  22.         // 通过workerman,向uid的页面推送数据' w, y/ j+ Y- [  I8 S. d
  23.         $ret = sendMessageByUid($uid, $buffer);. F* d; x- v4 {& z! S7 R2 o/ |4 J
  24.         // 返回推送结果# S2 v/ ^) J5 q( C7 h- n
  25.         $connection->send($ret ? 'ok' : 'fail');2 [' J' I6 w. q: m) Y% C4 ?
  26.     };/ s) w# z* L! z5 {! ]- K
  27.     // ## 执行监听 ##; p# q) A! g- O6 r2 Q( t0 p5 V
  28.     $inner_text_worker->listen();/ o' {+ X, W4 w: ^) ^. B9 p
  29. };
    ) H, U, }) [- o' k8 W& T: M
  30. // 新增加一个属性,用来保存uid到connection的映射
    & F. Q/ \- u- O. J# D! K( k& R
  31. $worker->uidConnections = array();
    0 Y4 [8 m* f7 H' f
  32. // 当有客户端发来消息时执行的回调函数* R7 e9 K4 V9 F/ G# v
  33. $worker->onMessage = function($connection, $data)& ]$ }+ N) Y( i1 F
  34. {
    5 A1 g7 k" N, ?7 ?3 A
  35.     global $worker;9 ~6 W' j: S, }5 f4 Y' ~
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
      S8 n1 t: s. i" u9 ]
  37.     if(!isset($connection->uid)): N' a) d0 b' q+ P& r9 t
  38.     {
    " |5 s1 a' P8 P4 _  A5 n+ e
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    * j7 O. r% o  C' u+ P
  40.        $connection->uid = $data;" ^2 W, a  ~8 B, i
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    : C6 O6 `. a$ U; ~0 K
  42.         * 实现针对特定uid推送数据
    " u/ W! ]1 c: K: b' {7 X! r
  43.         */" h* S1 P! ]1 T: ]- u  U3 ^
  44.        $worker->uidConnections[$connection->uid] = $connection;  p$ x' p6 P5 H/ ?; `8 J' v* h
  45.        return;6 n4 b) _2 o1 g/ u' p
  46.     }
    # ^* h! E* A2 I
  47. };
    7 Q9 n  S' c- W! n% j; I

  48. " K  l5 C/ D; r" t$ s: y' I
  49. // 当有客户端连接断开时9 ]0 v3 W) y' v% M  x9 q$ @
  50. $worker->onClose = function($connection)9 Y/ O0 b3 |% @) k7 Y% p8 `' e% Y
  51. {6 x" o! r; S5 l8 _% _! b5 D! Z, c
  52.     global $worker;
      \$ {. e- }# l9 t+ O
  53.     if(isset($connection->uid))
    % X# d$ i( \& a- v; @$ n
  54.     {
    4 n8 i9 t7 p- O. y  _: A; d0 E
  55.         // 连接断开时删除映射
    ) @: q5 g+ O! [5 e& P+ Z9 T
  56.         unset($worker->uidConnections[$connection->uid]);
    + U, U0 T6 C, m: Q- C$ S
  57.     }# k4 q5 ?" j% }* R# z1 _
  58. };9 Y- c' F% |8 {9 I5 \; y
  59. 0 A' Y' N3 i, z* r5 o3 {! y/ H
  60. // 向所有验证的用户推送数据
    ) @+ K5 T! w. D* \  P
  61. function broadcast($message)
    6 j( ~- k% L# p+ o) v
  62. {* p3 C7 d. z1 {5 D& a! ~% F
  63.    global $worker;
    7 H) _5 u/ c/ x/ n* r' j
  64.    foreach($worker->uidConnections as $connection)
    + A6 v# `8 e8 Q, W$ U
  65.    {: P4 c) R( q6 E) E# L& N% P; o
  66.         $connection->send($message);
    - I4 A* `1 w( T  i1 |; U/ z- e
  67.    }
    " h3 q; y' ~/ g1 d
  68. }
    2 x: c! f3 y, v" ?) X/ c1 M

  69. . j) `' m0 L. _% b- @, m
  70. // 针对uid推送数据
    " o# N# v: }$ C: K) ^/ n
  71. function sendMessageByUid($uid, $message)( L0 k$ s- k) q! E
  72. {% |5 Y% u$ O4 h) A% r
  73.     global $worker;2 }7 `+ R2 k( H& {* E
  74.     if(isset($worker->uidConnections[$uid]))
    " W9 V1 c6 o; J* U& E0 y% Q
  75.     {
    / |3 v  V9 r. M5 U: L
  76.         $connection = $worker->uidConnections[$uid];+ j: K2 q; X1 p5 x5 a6 i4 `
  77.         $connection->send($message);
    / ~- g6 v" W& a1 d& F8 ?
  78.         return true;
    3 ~! `0 T' V* l
  79.     }5 j* F8 N/ ]3 R
  80.     return false;$ ~* c4 L0 }! P+ F/ Q2 n
  81. }
    / e7 ?1 |) @0 r7 C3 D4 A  n7 u* v
  82. 7 D  [7 y! u  I9 Q4 ^5 B* M
  83. // 运行所有的worker% Q# H1 ]* w: Q) }* |3 b
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');: `! p2 u/ o( y* w" t; M
  2. ws.onopen = function(){! f1 ^1 o' o1 O# b( k$ G$ a
  3.     var uid = 'uid1';( b4 W4 l/ n8 r. L  P1 ]
  4.     ws.send(uid);
    6 j. @. j6 N1 O
  5. };3 r$ \. y0 Z- x5 ?* N
  6. ws.onmessage = function(e){" O* ~5 p8 g  _& Q1 ~7 B' p
  7.     alert(e.data);
    5 a7 F/ V0 k8 U. }4 G
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    % U8 M) K" m, i* G4 ?
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ( h2 R2 e6 u) ]4 O" g- a
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    , B( V: h) A* s0 X# \  r0 K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');, N0 h( n# `* E( T
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    5 u! w- i6 a7 J0 E% I# X& Y+ O# K
  6. fwrite($client, json_encode($data)."\n");
    " R" [- f0 a6 |9 [7 y
  7. // 读取推送结果  S- g. o& Q! l3 Q. I! S
  8. echo fread($client, 8192);
复制代码

2 X0 O3 H; B) m% I3 q0 b0 G% A  ~. n; ~( |6 d) C+ F. N. r& S$ T, r
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-5-2 13:36 , Processed in 0.064331 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!